import logging
from typing import Optional, Union, Tuple
import warnings
import numpy as np
import pandas as pd
from pandas import DataFrame
from pandas.api.types import is_numeric_dtype
import traja
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
class TrajaDataFrame(pd.DataFrame):
"""A TrajaDataFrame object is a subclass of pandas :class:`<~pandas.dataframe.DataFrame>`.
Args:
args: Typical arguments for pandas.DataFrame.
Returns:
traja.TrajaDataFrame -- TrajaDataFrame constructor.
>>> traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]}) # doctest: +SKIP
x y
0 0 1
1 1 2
2 2 3
"""
_metadata = [
"xlim",
"ylim",
"spatial_units",
"xlabel",
"ylabel",
"title",
"fps",
"time_units",
"time_col",
"id",
]
def __init__(self, *args, **kwargs):
# Allow setting metadata from constructor
traja_kwargs = dict()
for key in list(kwargs.keys()):
for name in self._metadata:
if key == name:
traja_kwargs[key] = kwargs.pop(key)
super(TrajaDataFrame, self).__init__(*args, **kwargs)
if len(args) == 1 and isinstance(args[0], TrajaDataFrame):
args[0]._copy_attrs(self)
for name, value in traja_kwargs.items():
self.__dict__[name] = value
# Initialize
self._convex_hull = None
# Initialize metadata like 'fps','spatial_units', etc.
self._init_metadata()
@property
def _constructor(self):
return TrajaDataFrame
def _copy_attrs(self, df):
for attr in self._metadata:
df.__dict__[attr] = getattr(self, attr, None)
def __finalize__(self, other, method=None, **kwargs):
"""propagate metadata from other to self"""
# merge operation: using metadata of the left object
if method == "merge":
for name in self._metadata:
object.__setattr__(self, name, getattr(other.left, name, None))
# concat operation: using metadata of the first object
elif method == "concat":
for name in self._metadata:
object.__setattr__(self, name, getattr(other.objs[0], name, None))
else:
for name in self._metadata:
object.__setattr__(self, name, getattr(other, name, None))
return self
# def __getitem__(self, key):
# """
# If result is a DataFrame with a x or X column, return a
# TrajaDataFrame.
# """
# result = super(TrajaDataFrame, self).__getitem__(key)
# if isinstance(result, DataFrame) and "x" == result or "X" == result:
# result.__class__ = TrajaDataFrame
# elif isinstance(result, DataFrame):
# result.__class__ = DataFrame
# return result
def _init_metadata(self):
defaults = dict(fps=None, spatial_units="m", time_units="s")
for name, value in defaults.items():
if name not in self.__dict__:
self.__dict__[name] = value
def _get_time_col(self):
time_cols = [col for col in self if "time" in col.lower()]
if time_cols:
time_col = time_cols[0]
if is_numeric_dtype(self[time_col]):
return time_col
else:
return None
@classmethod
def from_xy(cls, xy: np.ndarray):
"""Convenience function for initializing :class:`~traja.frame.TrajaDataFrame` with x,y coordinates.
Args:
xy (:class:`numpy.ndarray`): x,y coordinates
Returns:
traj_df (:class:`~traja.frame.TrajaDataFrame`): Trajectory as dataframe
.. doctest::
>>> import numpy as np
>>> xy = np.array([[0,1],[1,2],[2,3]])
>>> traja.from_xy(xy)
x y
0 0 1
1 1 2
2 2 3
"""
df = cls.from_records(xy, columns=["x", "y"])
return df
def set(self, key, value):
"""Set metadata."""
self.__dict__[key] = value
def __setattr__(self, name: str, value) -> None:
"""Override method for pandas.core.generic method __setattr__
Allows for setting attributes to dataframe without warning.
"""
try:
object.__getattribute__(self, name)
return object.__setattr__(self, name, value)
except AttributeError:
pass
if name in self._internal_names_set:
object.__setattr__(self, name, value)
elif name in self._metadata:
object.__setattr__(self, name, value)
else:
try:
existing = getattr(self, name)
if isinstance(existing, type(self.index)):
object.__setattr__(self, name, value)
elif name in self._info_axis:
self[name] = value
else:
object.__setattr__(self, name, value)
except (AttributeError, TypeError):
object.__setattr__(self, name, value)
@property
def center(self):
"""Return the center point of this trajectory."""
x = self.x
y = self.y
return float(x.mean()), float(y.mean())
@property
def convex_hull(self):
"""Property of TrajaDataFrame class representing
bounds for convex area enclosing trajectory points.
"""
# Calculate if it doesn't exist
if self._convex_hull is None:
xy_arr = self.traja.xy
point_arr = traja.trajectory.calc_convex_hull(xy_arr)
self._convex_hull = point_arr
return self._convex_hull
@convex_hull.setter
def convex_hull(self, values):
"""Set convex_hull property of TrajaDataFrame.
Returns:
np.array, calculated coordinates of convex hull boundary
"""
if values is not None and not values.shape[1] == 2:
raise Exception(
"XY coordinates must be in separate columns "
"for convex hull calculation."
)
elif values is None:
self._convex_hull = np.array([])
else:
point_arr = traja.trajectory.calc_convex_hull(values)
self._convex_hull = point_arr
@convex_hull.deleter
def convex_hull(self):
self._convex_hull = None
def tocontainer(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
return TrajaCollection(result)
return wrapper
[docs]class TrajaCollection(TrajaDataFrame):
"""Collection of trajectories."""
_metadata = [
"xlim",
"ylim",
"spatial_units",
"xlabel",
"ylabel",
"title",
"fps",
"time_units",
"time_col",
"_id_col",
]
def __init__(
self,
trjs: Union[TrajaDataFrame, pd.DataFrame, dict],
id_col: Optional[str] = None,
**kwargs,
):
"""Initialize with trajectories with x, y, and time columns.
Args:self.
trjs
id_col (str) - Default is "id"
"""
# Add id column
if isinstance(trjs, dict):
_trjs = []
for name, df in trjs.items():
df["id"] = name
_trjs.append(df)
super(TrajaCollection, self).__init__(pd.concat(_trjs), **kwargs)
elif isinstance(trjs, (TrajaDataFrame, DataFrame)):
super(TrajaCollection, self).__init__(trjs, **kwargs)
else:
super(TrajaCollection, self).__init__(trjs, **kwargs)
if id_col:
self._id_col = id_col
elif hasattr(self, "_id_col"):
self._id_col = self._id_col
else:
self._id_col = "id" # default
@property
def _constructor(self):
return TrajaCollection
def _copy_attrs(self, df):
for attr in self._metadata:
df.__dict__[attr] = getattr(self, attr, None)
# def __copy__(self):
# return TrajaCollection(self.trjs).__dict__.update(self.__dict__)
def __repr__(self):
return "TrajaCollection:\n" + super(TrajaCollection, self).__repr__()
# def __add__(self, other):
# trjs = self.trjs.append(other, ignore_index=True)
# return TrajaCollection(trjs, id_col=self._id_col)
[docs] def plot(self, colors=None, **kwargs):
"""Plot collection of trajectories with colors assigned to each id.
>>> trjs = {ind: traja.generate(seed=ind) for ind in range(3)} # doctest: +SKIP
>>> coll = traja.TrajaCollection(trjs) # doctest: +SKIP
>>> coll.plot() # doctest: +SKIP
"""
return traja.plotting.plot_collection(
self, self._id_col, colors=colors, **kwargs
)
[docs] def apply_all(self, method, **kwargs):
"""Applies method to all trajectories
Args:
method
Returns:
dataframe or series
>>> trjs = {ind: traja.generate(seed=ind) for ind in range(3)} # doctest: +SKIP
>>> coll = traja.TrajaCollection(trjs) # doctest: +SKIP
>>> angles = coll.apply_all(traja.calc_angle) # doctest: +SKIP
"""
return self.groupby(by=self._id_col).apply(method)
class StaticObject(object):
def __init__(
self,
x: Optional[float] = None,
y: Optional[float] = None,
bounding_box: Tuple[float] = None,
):
...
pass