Source code for traja.frame

import logging
from typing import Optional, Union, Tuple
import warnings

import numpy as np
import pandas as pd
from pandas import DataFrame
from pandas.api.types import is_numeric_dtype

import traja

logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)


class TrajaDataFrame(pd.DataFrame):
    """A TrajaDataFrame object is a subclass of pandas :class:`<~pandas.dataframe.DataFrame>`.

    Args:
      args: Typical arguments for pandas.DataFrame.

    Returns:
      traja.TrajaDataFrame -- TrajaDataFrame constructor.

        >>> traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]}) # doctest: +SKIP
           x  y
        0  0  1
        1  1  2
        2  2  3

    """

    _metadata = [
        "xlim",
        "ylim",
        "spatial_units",
        "xlabel",
        "ylabel",
        "title",
        "fps",
        "time_units",
        "time_col",
        "id",
    ]

    def __init__(self, *args, **kwargs):
        # Allow setting metadata from constructor
        traja_kwargs = dict()
        for key in list(kwargs.keys()):
            for name in self._metadata:
                if key == name:
                    traja_kwargs[key] = kwargs.pop(key)
        super(TrajaDataFrame, self).__init__(*args, **kwargs)
        if len(args) == 1 and isinstance(args[0], TrajaDataFrame):
            args[0]._copy_attrs(self)
        for name, value in traja_kwargs.items():
            self.__dict__[name] = value
        
        # Initialize 
        self._convex_hull = None

        # Initialize metadata like 'fps','spatial_units', etc.
        self._init_metadata()
    
    @property
    def _constructor(self):
        return TrajaDataFrame

    def _copy_attrs(self, df):
        for attr in self._metadata:
            df.__dict__[attr] = getattr(self, attr, None)

    def __finalize__(self, other, method=None, **kwargs):
        """propagate metadata from other to self"""
        # merge operation: using metadata of the left object
        if method == "merge":
            for name in self._metadata:
                object.__setattr__(self, name, getattr(other.left, name, None))
        # concat operation: using metadata of the first object
        elif method == "concat":
            for name in self._metadata:
                object.__setattr__(self, name, getattr(other.objs[0], name, None))
        else:
            for name in self._metadata:
                object.__setattr__(self, name, getattr(other, name, None))
        return self

    # def __getitem__(self, key):
    #     """
    #       If result is a DataFrame with a x or X column, return a
    #       TrajaDataFrame.
    #       """
    #     result = super(TrajaDataFrame, self).__getitem__(key)
    #     if isinstance(result, DataFrame) and "x" == result or "X" == result:
    #         result.__class__ = TrajaDataFrame
    #     elif isinstance(result, DataFrame):
    #         result.__class__ = DataFrame
    #     return result

    def _init_metadata(self):
        defaults = dict(fps=None, spatial_units="m", time_units="s")
        for name, value in defaults.items():
            if name not in self.__dict__:
                self.__dict__[name] = value

    def _get_time_col(self):
        time_cols = [col for col in self if "time" in col.lower()]
        if time_cols:
            time_col = time_cols[0]
            if is_numeric_dtype(self[time_col]):
                return time_col
        else:
            return None

    @classmethod
    def from_xy(cls, xy: np.ndarray):
        """Convenience function for initializing :class:`~traja.frame.TrajaDataFrame` with x,y coordinates.

        Args:
            xy (:class:`numpy.ndarray`): x,y coordinates

        Returns:
            traj_df (:class:`~traja.frame.TrajaDataFrame`): Trajectory as dataframe

        .. doctest::

            >>> import numpy as np
            >>> xy = np.array([[0,1],[1,2],[2,3]])
            >>> traja.from_xy(xy)
               x  y
            0  0  1
            1  1  2
            2  2  3

        """
        df = cls.from_records(xy, columns=["x", "y"])
        return df

    def set(self, key, value):
        """Set metadata."""
        self.__dict__[key] = value

    def __setattr__(self, name: str, value) -> None:
        """Override method for pandas.core.generic method __setattr__

        Allows for setting attributes to dataframe without warning.
        """
        try:
            object.__getattribute__(self, name)
            return object.__setattr__(self, name, value)
        except AttributeError:
            pass
        if name in self._internal_names_set:
            object.__setattr__(self, name, value)
        elif name in self._metadata:
            object.__setattr__(self, name, value)
        else:
            try:
                existing = getattr(self, name)
                if isinstance(existing, type(self.index)):
                    object.__setattr__(self, name, value)
                elif name in self._info_axis:
                    self[name] = value
                else:
                    object.__setattr__(self, name, value)
            except (AttributeError, TypeError):
                object.__setattr__(self, name, value)

    @property
    def center(self):
        """Return the center point of this trajectory."""
        x = self.x
        y = self.y
        return float(x.mean()), float(y.mean())
    
    @property
    def convex_hull(self):
        """Property of TrajaDataFrame class representing
        bounds for convex area enclosing trajectory points.

        """
        # Calculate if it doesn't exist
        if self._convex_hull is None:            
            xy_arr = self.traja.xy
            point_arr = traja.trajectory.calc_convex_hull(xy_arr)
            self._convex_hull = point_arr
        return self._convex_hull

    @convex_hull.setter
    def convex_hull(self, values):
        """Set convex_hull property of TrajaDataFrame.

        Returns:
            np.array, calculated coordinates of convex hull boundary
        """
        if values is not None and not values.shape[1] == 2:
            raise Exception(
                "XY coordinates must be in separate columns "
                "for convex hull calculation."
            )
        elif values is None:
            self._convex_hull = np.array([])
        else:
            point_arr = traja.trajectory.calc_convex_hull(values)
            self._convex_hull = point_arr

    @convex_hull.deleter
    def convex_hull(self):
        self._convex_hull = None


def tocontainer(func):
    def wrapper(*args, **kwargs):
        result = func(*args, **kwargs)
        return TrajaCollection(result)

    return wrapper


[docs]class TrajaCollection(TrajaDataFrame): """Collection of trajectories.""" _metadata = [ "xlim", "ylim", "spatial_units", "xlabel", "ylabel", "title", "fps", "time_units", "time_col", "_id_col", ] def __init__( self, trjs: Union[TrajaDataFrame, pd.DataFrame, dict], id_col: Optional[str] = None, **kwargs, ): """Initialize with trajectories with x, y, and time columns. Args:self. trjs id_col (str) - Default is "id" """ # Add id column if isinstance(trjs, dict): _trjs = [] for name, df in trjs.items(): df["id"] = name _trjs.append(df) super(TrajaCollection, self).__init__(pd.concat(_trjs), **kwargs) elif isinstance(trjs, (TrajaDataFrame, DataFrame)): super(TrajaCollection, self).__init__(trjs, **kwargs) else: super(TrajaCollection, self).__init__(trjs, **kwargs) if id_col: self._id_col = id_col elif hasattr(self, "_id_col"): self._id_col = self._id_col else: self._id_col = "id" # default @property def _constructor(self): return TrajaCollection def _copy_attrs(self, df): for attr in self._metadata: df.__dict__[attr] = getattr(self, attr, None) # def __copy__(self): # return TrajaCollection(self.trjs).__dict__.update(self.__dict__) def __repr__(self): return "TrajaCollection:\n" + super(TrajaCollection, self).__repr__() # def __add__(self, other): # trjs = self.trjs.append(other, ignore_index=True) # return TrajaCollection(trjs, id_col=self._id_col)
[docs] def plot(self, colors=None, **kwargs): """Plot collection of trajectories with colors assigned to each id. >>> trjs = {ind: traja.generate(seed=ind) for ind in range(3)} # doctest: +SKIP >>> coll = traja.TrajaCollection(trjs) # doctest: +SKIP >>> coll.plot() # doctest: +SKIP """ return traja.plotting.plot_collection( self, self._id_col, colors=colors, **kwargs )
[docs] def apply_all(self, method, **kwargs): """Applies method to all trajectories Args: method Returns: dataframe or series >>> trjs = {ind: traja.generate(seed=ind) for ind in range(3)} # doctest: +SKIP >>> coll = traja.TrajaCollection(trjs) # doctest: +SKIP >>> angles = coll.apply_all(traja.calc_angle) # doctest: +SKIP """ return self.groupby(by=self._id_col).apply(method)
class StaticObject(object): def __init__( self, x: Optional[float] = None, y: Optional[float] = None, bounding_box: Tuple[float] = None, ): ... pass