from typing import Union
import pandas as pd
from pandas.api.types import is_datetime64_any_dtype
import traja
[docs]@pd.api.extensions.register_dataframe_accessor("traja")
class TrajaAccessor(object):
"""Accessor for pandas DataFrame with trajectory-specific numerical and analytical functions.
Access with `df.traja`."""
def __init__(self, pandas_obj):
self._validate(pandas_obj)
self._obj = pandas_obj
__axes = ["x", "y"]
@staticmethod
def _set_axes(axes):
if len(axes) != 2:
raise ValueError(
"TrajaAccessor requires precisely two axes, got {}".format(len(axes))
)
TrajaAccessor.__axes = axes
def _strip(self, text):
try:
return text.strip()
except AttributeError:
return pd.to_numeric(text, errors="coerce")
@staticmethod
def _validate(obj):
if (
TrajaAccessor.__axes[0] not in obj.columns
or TrajaAccessor.__axes[1] not in obj.columns
):
raise AttributeError(
"Must have '{}' and '{}'.".format(*TrajaAccessor.__axes)
)
@property
def center(self):
"""Return the center point of this trajectory."""
x = self._obj.x
y = self._obj.y
return float(x.mean()), float(y.mean())
@property
def bounds(self):
"""Return limits of x and y dimensions (``(xmin, xmax), (ymin, ymax)``)."""
xlim = self._obj.x.min(), self._obj.x.max()
ylim = self._obj.y.min(), self._obj.y.max()
return (xlim, ylim)
[docs] def night(self, begin: str = "19:00", end: str = "7:00"):
"""Get nighttime dataset between `begin` and `end`.
Args:
begin (str): (Default value = '19:00')
end (str): (Default value = '7:00')
Returns:
trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory during night.
"""
return self.between(begin, end)
[docs] def day(self, begin: str = "7:00", end: str = "19:00"):
"""Get daytime dataset between `begin` and `end`.
Args:
begin (str): (Default value = '7:00')
end (str): (Default value = '19:00')
Returns:
trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory during day.
"""
return self.between(begin, end)
[docs] def _get_time_col(self):
"""Returns time column in trajectory.
Args:
Returns:
time_col (str or None): name of time column, 'index' or None
"""
return traja.trajectory._get_time_col(self._obj)
[docs] def between(self, begin: str, end: str):
"""Returns trajectory between `begin` and end` if `time` column is `datetime64`.
Args:
begin (str): Beginning of time slice.
end (str): End of time slice.
Returns:
trj (:class:`~traja.frame.TrajaDataFrame`): Dataframe between values.
.. doctest ::
>>> s = pd.to_datetime(pd.Series(['Jun 30 2000 12:00:01', 'Jun 30 2000 12:00:02', 'Jun 30 2000 12:00:03']))
>>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3],'time':s})
>>> df.traja.between('12:00:00','12:00:01')
time x y
0 2000-06-30 12:00:01 0 1
"""
time_col = self._get_time_col()
if time_col == "index":
return self._obj.between_time(begin, end)
elif time_col and is_datetime64_any_dtype(self._obj[time_col]):
# Backup index
dt_index_col = self._obj.index.name
# Set dt_index
trj = self._obj.copy()
trj.set_index(time_col, inplace=True)
# Create slice of trajectory
trj = trj.between_time(begin, end)
# Restore index and return column
if dt_index_col:
trj.set_index(dt_index_col, inplace=True)
else:
trj.reset_index(inplace=True)
return trj
else:
raise TypeError("Either time column or index must be datetime64")
[docs] def resample_time(self, step_time: float):
"""Returns trajectory resampled with ``step_time``.
Args:
step_time (float): Step time
Returns:
trj (:class:`~traja.frame.TrajaDataFrame`): Dataframe resampled.
"""
return traja.trajectory.resample_time(self._obj, step_time=step_time)
[docs] def rediscretize_points(self, R, **kwargs):
"""Rediscretize points"""
return traja.trajectory.rediscretize_points(self._obj, R=R, **kwargs)
[docs] def trip_grid(
self,
bins: Union[int, tuple] = 10,
log: bool = False,
spatial_units=None,
normalize: bool = False,
hist_only: bool = False,
plot: bool = True,
**kwargs,
):
"""Returns a 2D histogram of trip.
Args:
bins (int, optional): Number of bins (Default value = 16)
log (bool): log scale histogram (Default value = False)
spatial_units (str): units for plotting
normalize (bool): normalize histogram into density plot
hist_only (bool): return histogram without plotting
Returns:
hist (:class:`numpy.ndarray`): 2D histogram as array
image (:class:`matplotlib.collections.PathCollection`: image of histogram
"""
hist, image = traja.plotting.trip_grid(
self._obj,
bins=bins,
log=log,
spatial_units=self._obj.get("spatial_units", "m"),
normalize=normalize,
hist_only=hist_only,
plot=plot,
**kwargs,
)
return hist, image
[docs] def plot(self, n_coords: int = None, show_time=False, **kwargs):
"""Plot trajectory over period.
Args:
n_coords (int): Number of coordinates to plot
**kwargs: additional keyword arguments to :meth:`matplotlib.axes.Axes.scatter`
Returns:
ax (:class:`~matplotlib.axes.Axes`): Axes of plot
"""
ax = traja.plotting.plot(
trj=self._obj,
accessor=self,
n_coords=n_coords,
show_time=show_time,
**kwargs,
)
return ax
[docs] def plot_3d(self, **kwargs):
"""Plot 3D trajectory for single identity over period.
Args:
trj (:class:`traja.TrajaDataFrame`): trajectory
n_coords (int, optional): Number of coordinates to plot
**kwargs: additional keyword arguments to :meth:`matplotlib.axes.Axes.scatter`
Returns:
collection (:class:`~matplotlib.collections.PathCollection`): collection that was plotted
.. note::
Takes a while to plot large trajectories. Consider using first::
rt = trj.traja.rediscretize(R=1.) # Replace R with appropriate step length
rt.traja.plot_3d()
"""
ax = traja.plotting.plot_3d(trj=self._obj, **kwargs)
return ax
[docs] def plot_flow(self, kind="quiver", **kwargs):
"""Plot grid cell flow.
Args:
kind (str): Kind of plot (eg, 'quiver','surface','contour','contourf','stream')
**kwargs: additional keyword arguments to :meth:`matplotlib.axes.Axes.scatter`
Returns:
ax (:class:`~matplotlib.axes.Axes`): Axes of plot
"""
ax = traja.plotting.plot_flow(trj=self._obj, kind=kind, **kwargs)
return ax
[docs] def plot_collection(self, colors=None, **kwargs):
return traja.plotting.plot_collection(
self._obj, id_col=self._id_col, colors=colors, **kwargs
)
[docs] def apply_all(self, method, id_col=None, **kwargs):
"""Applies method to all trajectories and returns grouped dataframes or series"""
id_col = id_col or getattr(self, "_id_col", "id")
return self._obj.groupby(by=id_col).apply(method, **kwargs)
def _has_cols(self, cols: list):
return traja.trajectory._has_cols(self._obj, cols)
@property
def xy(self):
"""Returns a :class:`numpy.ndarray` of x,y coordinates.
Args:
split (bool): Split into seaprate x and y :class:`numpy.ndarrays`
Returns:
xy (:class:`numpy.ndarray`) -- x,y coordinates (separate if `split` is `True`)
.. doctest::
>>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]})
>>> df.traja.xy
array([[0, 1],
[1, 2],
[2, 3]])
"""
if self._has_cols(["x", "y"]):
xy = self._obj[["x", "y"]].values
return xy
else:
raise Exception("'x' and 'y' are not in the dataframe.")
[docs] def _check_has_time(self):
"""Check for presence of displacement time column."""
time_col = self._get_time_col()
if time_col is None:
raise Exception("Missing time information in trajectory.")
[docs] def __getattr__(self, name):
"""Catch all method calls which are not defined and forward to modules."""
def method(*args, **kwargs):
if name in traja.plotting.__all__:
return getattr(traja.plotting, name)(self._obj, *args, **kwargs)
elif name in traja.trajectory.__all__:
return getattr(traja.plotting, name)(self._obj, *args, **kwargs)
elif name in dir(self):
return getattr(self, name)(*args)(**kwargs)
else:
raise AttributeError(f"{name} attribute not defined")
return method
[docs] def transitions(self, *args, **kwargs):
"""Calculate transition matrix"""
return traja.transitions(self._obj, *args, **kwargs)
[docs] def calc_derivatives(self, assign: bool = False):
"""Returns derivatives `displacement` and `displacement_time`.
Args:
assign (bool): Assign output to ``TrajaDataFrame`` (Default value = False)
Returns:
derivs (:class:`~collections.OrderedDict`): Derivatives.
.. doctest::
>>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3],'time':[0., 0.2, 0.4]})
>>> df.traja.calc_derivatives()
displacement displacement_time
0 NaN 0.0
1 1.414214 0.2
2 1.414214 0.4
"""
derivs = traja.trajectory.calc_derivatives(self._obj)
if assign:
trj = self._obj.merge(derivs, left_index=True, right_index=True)
self._obj = trj
return derivs
[docs] def get_derivatives(self) -> pd.DataFrame:
"""Returns derivatives as DataFrame."""
derivs = traja.trajectory.get_derivatives(self._obj)
return derivs
[docs] def speed_intervals(
self,
faster_than: Union[float, int] = None,
slower_than: Union[float, int] = None,
):
"""Returns ``TrajaDataFrame`` with speed time intervals.
Returns a dataframe of time intervals where speed is slower and/or faster than specified values.
Args:
faster_than (float, optional): Minimum speed threshold. (Default value = None)
slower_than (float or int, optional): Maximum speed threshold. (Default value = None)
Returns:
result (:class:`~pandas.DataFrame`) -- time intervals as dataframe
.. note::
Implementation ported to Python, heavily inspired by Jim McLean's trajr package.
"""
result = traja.trajectory.speed_intervals(self._obj, faster_than, slower_than)
return result
[docs] def to_shapely(self):
"""Returns shapely object for area, bounds, etc. functions.
Args:
Returns:
shape (shapely.geometry.linestring.LineString): Shapely shape.
.. doctest::
>>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]})
>>> shape = df.traja.to_shapely()
>>> shape.is_closed
False
"""
trj = self._obj[["x", "y"]].dropna()
tracks_shape = traja.trajectory.to_shapely(trj)
return tracks_shape
[docs] def calc_displacement(self, assign: bool = True) -> pd.Series:
"""Returns ``Series`` of `float` with displacement between consecutive indices.
Args:
assign (bool, optional): Assign displacement to TrajaAccessor (Default value = True)
Returns:
displacement (:class:`pandas.Series`): Displacement series.
.. doctest::
>>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]})
>>> df.traja.calc_displacement()
0 NaN
1 1.414214
2 1.414214
Name: displacement, dtype: float64
"""
displacement = traja.trajectory.calc_displacement(self._obj)
if assign:
self._obj = self._obj.assign(displacement=displacement)
return displacement
[docs] def calc_angle(self, assign: bool = True) -> pd.Series:
"""Returns ``Series`` with angle between steps as a function of displacement with regard to x axis.
Args:
assign (bool, optional): Assign turn angle to TrajaAccessor (Default value = True)
Returns:
angle (:class:`pandas.Series`): Angle series.
.. doctest::
>>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]})
>>> df.traja.calc_angle()
0 NaN
1 45.0
2 45.0
dtype: float64
"""
angle = traja.trajectory.calc_angle(self._obj)
if assign:
self._obj["angle"] = angle
return angle
[docs] def scale(self, scale: float, spatial_units: str = "m"):
"""Scale trajectory when converting, eg, from pixels to meters.
Args:
scale(float): Scale to convert coordinates
spatial_units(str., optional): Spatial units (eg, 'm') (Default value = "m")
.. doctest::
>>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]})
>>> df.traja.scale(0.1)
>>> df
x y
0 0.0 0.1
1 0.1 0.2
2 0.2 0.3
"""
self._obj[["x", "y"]] *= scale
self._obj.__dict__["spatial_units"] = spatial_units
def _transfer_metavars(self, df):
for attr in self._obj._metadata:
df.__dict__[attr] = getattr(self._obj, attr, None)
return df
[docs] def rediscretize(self, R: float):
"""Resample a trajectory to a constant step length. R is rediscretized step length.
Args:
R (float): Rediscretized step length (eg, 0.02)
Returns:
rt (:class:`traja.TrajaDataFrame`): rediscretized trajectory
.. note::
Based on the appendix in Bovet and Benhamou, (1988) and Jim McLean's
`trajr <https://github.com/JimMcL/trajr>`_ implementation.
.. doctest::
>>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]})
>>> df.traja.rediscretize(1.)
x y
0 0.000000 1.000000
1 0.707107 1.707107
2 1.414214 2.414214
"""
if not isinstance(R, (int, float)):
raise ValueError(f"R must be provided as float or int")
rt = traja.trajectory.rediscretize_points(self._obj, R)
self._transfer_metavars(rt)
return rt
[docs] def grid_coordinates(self, **kwargs):
return traja.grid_coordinates(self._obj, **kwargs)
[docs] def calc_heading(self, assign: bool = True):
"""Calculate trajectory heading.
Args:
assign (bool): (Default value = True)
Returns:
heading (:class:`pandas.Series`): heading as a ``Series``
..doctest::
>>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]})
>>> df.traja.calc_heading()
0 NaN
1 45.0
2 45.0
Name: heading, dtype: float64
"""
heading = traja.trajectory.calc_heading(self._obj)
if assign:
self._obj["heading"] = heading
return heading
[docs] def calc_turn_angle(self, assign: bool = True):
"""Calculate turn angle.
Args:
assign (bool): (Default value = True)
Returns:
turn_angle (:class:`~pandas.Series`): Turn angle
.. doctest::
>>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]})
>>> df.traja.calc_turn_angle()
0 NaN
1 NaN
2 0.0
Name: turn_angle, dtype: float64
"""
turn_angle = traja.trajectory.calc_turn_angle(self._obj)
if assign:
self._obj["turn_angle"] = turn_angle
return turn_angle