Source code for traja.trajectory

import logging
import math
from collections import OrderedDict
from typing import Callable, Optional, Tuple, Union

import numpy as np
import pandas as pd
from pandas.core.dtypes.common import (is_datetime64_any_dtype,
                                       is_timedelta64_dtype)
from scipy import signal
from scipy.spatial.distance import directed_hausdorff, euclidean

import traja
from traja import TrajaDataFrame
from traja.core import is_datetime_or_timedelta_dtype

__all__ = [
    "_bins_to_tuple",
    "_get_time_col",
    "_get_xylim",
    "_grid_coords1D",
    "_has_cols",
    "_rediscretize_points",
    "_resample_time",
    "calc_angle",
    "calc_convex_hull",
    "calc_derivatives",
    "calc_displacement",
    "calc_heading",
    "calc_laterality",
    "calc_turn_angle",
    "calc_flow_angles",
    "cartesian_to_polar",
    "coords_to_flow",
    "determine_colinearity",
    "distance_between",
    "distance",
    "euclidean",
    "expected_sq_displacement",
    "fill_in_traj",
    "from_xy",
    "from_latlon",
    "inside",
    "generate",
    "get_derivatives",
    "grid_coordinates",
    "latlon_to_xy",
    "length",
    "polar_to_z",
    "rediscretize_points",
    "batch_process",
    "train_test_split",
    "resample_time",
    "return_angle_to_point",
    "rotate",
    "smooth_sg",
    "speed_intervals",
    "step_lengths",
    "to_shapely",
    "traj_from_coords",
    "transition_matrix",
    "transitions",
]

logger = logging.getLogger("traja")


[docs] def smooth_sg(trj: TrajaDataFrame, w: Optional[int] = None, p: int = 3) -> TrajaDataFrame: """Returns ``DataFrame`` of trajectory after Savitzky-Golay filtering. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory w (int): window size (Default value = None) p (int): polynomial order (Default value = 3) Returns: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory .. doctest:: >> df = traja.generate() >> traja.smooth_sg(df, w=101).head() x y time 0 -11.194803 12.312742 0.00 1 -10.236337 10.613720 0.02 2 -9.309282 8.954952 0.04 3 -8.412910 7.335925 0.06 4 -7.546492 5.756128 0.08 """ if w is None: w = p + 3 - p % 2 if w % 2 != 1: raise ValueError(f"Invalid smoothing parameter w ({w}): n must be odd") _trj = trj.copy() _trj.x = signal.savgol_filter(_trj.x, window_length=w, polyorder=p, axis=0) _trj.y = signal.savgol_filter(_trj.y, window_length=w, polyorder=p, axis=0) _trj = fill_in_traj(_trj) return _trj
def apply_all(trj: TrajaDataFrame, method: Callable, id_col: str, **kwargs): """Applies method to all trajectories""" return trj.groupby(by=id_col).apply(method, **kwargs) def step_lengths(trj: TrajaDataFrame) -> pd.Series: """Length of the steps of ``trj``. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory Returns: pd.Series: Step lengths """ displacement = traja.trajectory.calc_displacement(trj) return displacement def polar_to_z(r: float, theta: float) -> complex: """Converts polar coordinates ``r`` and ``theta`` to complex number ``z``. Args: r (float): step size theta (float): angle Returns: z (complex): complex number z """ return r * np.exp(1j * theta) def cartesian_to_polar(xy: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Convert :class:`numpy.ndarray` ``xy`` to polar coordinates ``r`` and ``theta``. Args: xy (:class:`numpy.ndarray`): x,y coordinates Returns: r, theta (tuple of float): step-length and angle """ assert xy.ndim == 2, f"Dimensions are {xy.ndim}, expecting 2" x, y = np.split(xy, [-1], axis=1) x, y = np.squeeze(x), np.squeeze(y) r = np.sqrt(x * x + y * y) theta = np.arctan2(y, x) return r, theta
[docs] def distance(trj: TrajaDataFrame) -> float: """Calculates the distance from start to end of trajectory, also called net distance, displacement, or bee-line from start to finish. Supports both 2D and 3D trajectories. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory Returns: distance (float) .. doctest:: >> df = traja.generate() >> traja.distance(df) 117.01507823153617 """ # Support 3D trajectories if z column exists if "z" in trj.columns: start = trj.iloc[0][["x", "y", "z"]].values end = trj.iloc[-1][["x", "y", "z"]].values else: start = trj.iloc[0][["x", "y"]].values end = trj.iloc[-1][["x", "y"]].values return np.linalg.norm(end - start)
[docs] def length(trj: TrajaDataFrame) -> float: """Calculates the cumulative length of a trajectory. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory Returns: length (float) .. doctest:: >> df = traja.generate() >> traja.length(df) 2001.142339606066 """ displacement = trj.traja.calc_displacement() return displacement.sum()
def latlon_to_xy(lat: np.ndarray, lon: np.ndarray, origin: Optional[Tuple[float, float]] = None) -> Tuple[np.ndarray, np.ndarray]: """Convert latitude/longitude coordinates to local x, y coordinates in meters. Uses Haversine formula for small distances. For more accurate projection over larger distances, consider using a proper map projection library like pyproj. Args: lat: Array of latitudes in decimal degrees lon: Array of longitudes in decimal degrees origin: (lat, lon) tuple for origin point. If None, uses first point. Returns: (x, y): Tuple of numpy arrays in meters .. note:: Assumes Earth radius = 6371000 meters. Accuracy decreases for large distances (>100km). .. doctest:: >>> import numpy as np >>> lat = np.array([40.7128, 40.7138, 40.7148]) >>> lon = np.array([-74.0060, -74.0050, -74.0040]) >>> x, y = traja.latlon_to_xy(lat, lon) >>> len(x) == len(lat) True """ EARTH_RADIUS = 6371000 # meters lat = np.asarray(lat) lon = np.asarray(lon) if origin is None: origin = (lat[0], lon[0]) lat0, lon0 = origin # Convert to radians lat_rad = np.radians(lat) lon_rad = np.radians(lon) lat0_rad = np.radians(lat0) lon0_rad = np.radians(lon0) # Calculate x, y in meters x = EARTH_RADIUS * (lon_rad - lon0_rad) * np.cos(lat0_rad) y = EARTH_RADIUS * (lat_rad - lat0_rad) return x, y def from_latlon(lat: Union[list, np.ndarray], lon: Union[list, np.ndarray], origin: Optional[Tuple[float, float]] = None, **kwargs) -> TrajaDataFrame: """Create TrajaDataFrame from GPS latitude/longitude coordinates. Args: lat: Latitude coordinates in decimal degrees lon: Longitude coordinates in decimal degrees origin: (lat, lon) tuple for origin point. If None, uses first point. **kwargs: Additional arguments passed to TrajaDataFrame constructor Returns: TrajaDataFrame with x, y coordinates in meters .. doctest:: >>> import numpy as np >>> lat = [40.7128, 40.7138, 40.7148] >>> lon = [-74.0060, -74.0050, -74.0040] >>> df = traja.from_latlon(lat, lon) >>> 'x' in df.columns and 'y' in df.columns True >>> len(df) == 3 True """ x, y = latlon_to_xy(np.array(lat), np.array(lon), origin) df = TrajaDataFrame({'x': x, 'y': y, 'lat': lat, 'lon': lon}, **kwargs) return df def expected_sq_displacement( trj: TrajaDataFrame, n: int = 0, eqn1: bool = True ) -> float: """Expected displacement. .. note:: This method is experimental and needs testing. """ sl = traja.step_lengths(trj) ta = traja.calc_angle(trj) l1 = np.mean(sl) l2 = np.mean(sl**2) c = np.mean(np.cos(ta)) s = np.mean(np.sin(ta)) s2 = s**2 if eqn1: # Eqn 1 alpha = np.arctan2(s, c) gamma = ((1 - c) ** 2 - s2) * np.cos((n + 1) * alpha) - 2 * s * ( 1 - c ) * np.sin((n + 1) * alpha) esd = ( n * l2 + 2 * l1**2 * ((c - c**2 - s2) * n - c) / ((1 - c) ** 2 + s2) + 2 * l1**2 * ((2 * s2 + (c + s2) ** ((n + 1) / 2)) / ((1 - c) ** 2 + s2) ** 2) * gamma ) return abs(esd) else: logger.info("This method is experimental and requires testing.") # Eqn 2 esd = n * l2 + 2 * l1**2 * c / (1 - c) * (n - (1 - c**n) / (1 - c)) return esd def traj_from_coords( track: Union[np.ndarray, pd.DataFrame], x_col=1, y_col=2, time_col: Optional[str] = None, fps: Union[float, int] = 4, spatial_units: str = "m", time_units: str = "s", ) -> TrajaDataFrame: """Create TrajaDataFrame from coordinates. Args: track: N x 2 numpy array or pandas DataFrame with x and y columns x_col: column index or x column name y_col: column index or y column name time_col: name of time column fps: Frames per seconds spatial_units: default m, optional time_units: default s, optional Returns: trj: TrajaDataFrame .. doctest:: >> xy = np.random.random((1000, 2)) >> trj = traja.traj_from_coord(xy) >> assert trj.shape == (1000,4) # columns x, y, time, dt """ if not isinstance(track, traja.TrajaDataFrame): if isinstance(track, np.ndarray) and track.shape[1] == 2: trj = traja.from_xy(track) elif isinstance(track, pd.DataFrame): trj = traja.TrajaDataFrame(track) else: trj = track trj.traja.spatial_units = spatial_units trj.traja.time_units = time_units def rename(col, name, trj): if isinstance(col, int): trj.rename(columns={col: name}, inplace=True) else: if col not in trj: raise KeyError(f"Missing column {col}") trj.rename(columns={col: name}, inplace=True) return trj # Ensure column names are as expected trj = rename(x_col, "x", trj) trj = rename(y_col, "y", trj) if time_col is not None: trj = rename(time_col, "time", trj) # Allocate times if they aren't already known if "time" not in trj: if fps is None: raise ValueError( ( "Cannot create a trajectory without times: either fps or a time column must be specified" ) ) # Assign times to each frame, starting at 0 trj["time"] = pd.Series(np.arange(0, len(trj)) / fps) # Get displacement time for each coordinate, with the first point at time 0 trj["dt"] = trj.time - trj.time.iloc[0] return trj def distance_between(A: traja.TrajaDataFrame, B: traja.TrajaDataFrame, method="dtw"): """Returns distance between two trajectories. Args: A (:class:`~traja.frame.TrajaDataFrame`) : Trajectory 1 B (:class:`~traja.frame.TrajaDataFrame`) : Trajectory 2 method (str): ``dtw`` for dynamic time warping, ``hausdorff`` for Hausdorff Returns: distance (float): Distance """ if method == "hausdorff": dist0 = directed_hausdorff(A, B)[0] dist1 = directed_hausdorff(B, A)[0] symmetric_dist = max(dist0, dist1) return symmetric_dist elif method == "dtw": try: from fastdtw import fastdtw except ImportError: raise ImportError( """ Missing optional dependency 'fastdtw'. Install fastdtw for dynamic time warping distance with pip install fastdtw. """ ) distance, path = fastdtw(A, B, dist=euclidean) return distance def to_shapely(trj): """Returns shapely object for area, bounds, etc. functions. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory Returns: shapely.geometry.linestring.LineString -- Shapely shape. .. doctest:: >>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]}) >>> shape = traja.to_shapely(df) >>> shape.is_closed False """ from shapely.geometry import shape coords = trj[["x", "y"]].values tracks_obj = {"type": "LineString", "coordinates": coords} tracks_shape = shape(tracks_obj) return tracks_shape def transition_matrix(grid_indices1D: np.ndarray): """Returns ``np.ndarray`` of Markov transition probability matrix for grid cell transitions. Args: grid_indices1D (:class:`np.ndarray`) Returns: M (:class:`numpy.ndarray`) """ if not isinstance(grid_indices1D, np.ndarray): raise TypeError(f"Expected np.ndarray, got {type(grid_indices1D)}") n = 1 + max(grid_indices1D.flatten()) # number of states M = [[0] * n for _ in range(n)] for i, j in zip(grid_indices1D, grid_indices1D[1:]): M[i][j] += 1 # Convert to probabilities for row in M: s = sum(row) if s > 0: row[:] = [f / s for f in row] return np.array(M) def _bins_to_tuple(trj, bins: Union[int, Tuple[int, int]] = 10): """Returns tuple of x, y bins Args: trj: Trajectory bins: The bin specification: If int, the number of bins for the smallest of the two dimensions such that (min(nx,ny)=bins). If [int, int], the number of bins in each dimension (nx, ny = bins). Returns: bins (Sequence[int,int]): Bins (nx, ny) """ if bins is None: bins = 10 if isinstance(bins, int): # make aspect equal xlim, ylim = _get_xylim(trj) aspect = (ylim[1] - ylim[0]) / (xlim[1] - xlim[0]) if aspect >= 1: bins = (bins, int(bins * aspect)) else: bins = (int(bins / aspect), bins) assert len(bins) == 2, f"bins should be length 2 but is {len(bins)}" return bins def calc_laterality( trj: TrajaDataFrame, dist_thresh: float, angle_thresh: float = 30, ): """Calculate laterality of a trajectory. Laterality is the preference for left or right turning. It is calculated with the number of left and right turns. Args: trj: Trajectory dist_thresh: distance for a step to count as a turn angle_thresh: angle threshold (from angle to 90 degrees) Returns: right_turns (int) left_turns (int) """ # get turn angle with regard to x axis if "turn_angle" not in trj.columns: turn_angle = calc_turn_angle(trj) else: turn_angle = trj.turn_agle distance = step_lengths(trj) distance_mask = distance > dist_thresh angle_mask = ((turn_angle > angle_thresh) & (turn_angle < 90)) | ( (turn_angle < -angle_thresh) & (turn_angle > -90) ) turns = turn_angle[distance_mask & angle_mask].dropna() left_turns = turns[turn_angle > 0].shape[0] right_turns = turns[turn_angle < 0].shape[0] return right_turns, left_turns def calc_flow_angles(grid_indices: np.ndarray): """Calculate average flow between grid indices.""" bins = (grid_indices[:, 0].max(), grid_indices[:, 1].max()) # Initialize with empty lists instead of np.ndarray for better append performance M = [[[] for _ in range(bins[0])] for _ in range(bins[1])] for i, j in zip(grid_indices, grid_indices[1:]): # Account for fact that grid indices uses 1-base indexing ix = i[0] - 1 iy = i[1] - 1 jx = j[0] - 1 jy = j[1] - 1 if np.array_equal(i, j): angle = None elif ix == jx and iy > jy: # move towards y origin (down by default) angle = 3 * np.pi / 2 elif ix == jx and iy < jy: # move towards y origin (up by default) angle = np.pi / 2 elif ix < jx and iy == jy: # move right angle = 0 elif ix > jx and iy == jy: # move left angle = np.pi elif ix > jx and iy > jy: # move towards y origin (top left) angle = 3 * np.pi / 4 elif ix > jx and iy < jy: # move away from y origin (bottom left) angle = 5 * np.pi / 4 elif ix < jx and iy < jy: # move away from y origin (bottom right) angle = 7 * np.pi / 4 elif ix < jx and iy > jy: # move towards y origin (top right) angle = np.pi / 4 if angle is not None: M[iy][ix].append(angle) # Convert lists to numpy arrays M = np.array([[np.array(cell) if cell else None for cell in row] for row in M], dtype=object) U = np.ones_like(M) # x component of arrow V = np.empty_like(M) # y component of arrow for i, row in enumerate(M): for j, angles in enumerate(row): x = y = 0 # average_angle = None if angles is not None and len(angles) > 1: for angle in angles: if angle is None: continue x += np.cos(angle) y += np.sin(angle) # average_angle = np.arctan2(y, x) U[i, j] = x V[i, j] = y else: U[i, j] = 0 V[i, j] = 0 return U.astype(float), V.astype(float) def _grid_coords1D(grid_indices: np.ndarray): """Convert 2D grid indices to 1D indices.""" if isinstance(grid_indices, pd.DataFrame): grid_indices = grid_indices.values grid_indices1D = [] nr_cols = int(grid_indices[:, 0].max()) + 1 for coord in grid_indices: grid_indices1D.append( coord[1] * nr_cols + coord[0] ) # nr_rows * col_length + nr_cols return np.array(grid_indices1D, dtype=int) def transitions(trj: TrajaDataFrame, **kwargs): """Get first-order Markov model for transitions between grid cells. Args: trj (trajectory) kwargs: kwargs to :func:`traja.grid_coordinates` """ if "xbin" not in trj.columns or "ybin" not in trj.columns: grid_indices = grid_coordinates(trj, **kwargs) else: grid_indices = trj[["xbin", "ybin"]] # Drop nan for converting to int grid_indices.dropna(subset=["xbin", "ybin"], inplace=True) grid_indices1D = _grid_coords1D(grid_indices) transitions_matrix = transition_matrix(grid_indices1D) return transitions_matrix def grid_coordinates( trj: TrajaDataFrame, bins: Union[int, tuple] = None, xlim: tuple = None, ylim: tuple = None, assign: bool = False, ): """Returns ``DataFrame`` of trajectory discretized into 2D lattice grid coordinates. Args: trj (~`traja.frame.TrajaDataFrame`): Trajectory bins (tuple or int) xlim (tuple) ylim (tuple) assign (bool): Return updated original dataframe Returns: trj (~`traja.frame.TrajaDataFrame`): Trajectory is assign=True otherwise pd.DataFrame """ # Drop nan for converting to int trj.dropna(subset=["x", "y"], inplace=True) xmin = trj.x.min() if xlim is None else xlim[0] xmax = trj.x.max() if xlim is None else xlim[1] ymin = trj.y.min() if ylim is None else ylim[0] ymax = trj.y.max() if ylim is None else ylim[1] bins = _bins_to_tuple(trj, bins) if not xlim: xbin = pd.cut(trj.x, bins[0], labels=False) else: xmin, xmax = xlim xbinarray = np.linspace(xmin, xmax, bins[0]) xbin = np.digitize(trj.x, xbinarray) if not ylim: ybin = pd.cut(trj.y, bins[1], labels=False) else: ymin, ymax = ylim ybinarray = np.linspace(ymin, ymax, bins[1]) ybin = np.digitize(trj.y, ybinarray) if assign: trj["xbin"] = xbin trj["ybin"] = ybin return trj return pd.DataFrame({"xbin": xbin, "ybin": ybin})
[docs] def generate( n: int = 1000, random: bool = True, step_length: int = 2, angular_error_sd: float = 0.5, angular_error_dist: Callable = None, linear_error_sd: float = 0.2, linear_error_dist: Callable = None, fps: float = 50, spatial_units: str = "m", seed: int = None, convex_hull: bool = False, **kwargs, ): """Generates a trajectory. If ``random`` is ``True``, the trajectory will be a correlated random walk/idiothetic directed walk (Kareiva & Shigesada, 1983), corresponding to an animal navigating without a compass (Cheung, Zhang, Stricker, & Srinivasan, 2008). If ``random`` is ``False``, it will be(np.ndarray) a directed walk/allothetic directed walk/oriented path, corresponding to an animal navigating with a compass (Cheung, Zhang, Stricker, & Srinivasan, 2007, 2008). By default, for both random and directed walks, errors are normally distributed, unbiased, and independent of each other, so are **simple directed walks** in the terminology of Cheung, Zhang, Stricker, & Srinivasan, (2008). This behaviour may be modified by specifying alternative values for the ``angular_error_dist`` and/or ``linear_error_dist`` parameters. The initial angle (for a random walk) or the intended direction (for a directed walk) is ``0`` radians. The starting position is ``(0, 0)``. Args: n (int): (Default value = 1000) random (bool): (Default value = True) step_length: (Default value = 2) angular_error_sd (float): (Default value = 0.5) angular_error_dist (Callable): (Default value = None) linear_error_sd (float): (Default value = 0.2) linear_error_dist (Callable): (Default value = None) fps (float): (Default value = 50) convex_hull (bool): (Default value = False) spatial_units: (Default value = 'm') **kwargs: Additional arguments Returns: trj (:class:`traja.frame.TrajaDataFrame`): Trajectory .. note:: Based on Jim McLean's `trajr <https://github.com/JimMcL/trajr>`_, ported to Python. **Reference**: McLean, D. J., & Skowron Volponi, M. A. (2018). trajr: An R package for characterisation of animal trajectories. Ethology, 124(6), 440-448. https://doi.org/10.1111/eth.12739. """ if seed is None: np.random.seed(0) else: np.random.seed(seed) if angular_error_dist is None: angular_error_dist = np.random.normal( loc=0.0, scale=angular_error_sd, size=n - 1 ) if linear_error_dist is None: linear_error_dist = np.random.normal(loc=0.0, scale=linear_error_sd, size=n - 1) angular_errors = angular_error_dist linear_errors = linear_error_dist step_lengths = step_length + linear_errors # Don't allow negative lengths step_lengths[step_lengths < 0] = 0 steps = polar_to_z(step_lengths, angular_errors) if random: # Accumulate angular errors coords = np.zeros(n, dtype=complex) angle = 0 for i in range(n - 1): angle += angular_errors[i] length = step_length + linear_errors[i] coords[i + 1] = coords[i] + polar_to_z(r=length, theta=angle) else: coords = np.concatenate(([complex(0)], np.cumsum(steps))) x = coords.real y = coords.imag df = traja.TrajaDataFrame(data={"x": x, "y": y}) if fps in (0, None): raise ValueError("fps must be greater than 0") df.fps = fps time = df.index / fps df["time"] = time df.spatial_units = spatial_units for key, value in kwargs.items(): df.__dict__[key] = value # Update metavars metavars = dict(angular_error_sd=angular_error_sd, linear_error_sd=linear_error_sd) df.__dict__.update(metavars) # Attribute convex hull to dataframe if convex_hull: df.convex_hull = df[["x", "y"]].values else: del df.convex_hull return df
def _resample_time( trj: TrajaDataFrame, step_time: Union[float, int, str], errors="coerce" ): if not is_datetime_or_timedelta_dtype(trj.index): raise TypeError(f"{trj.index.dtype} is not datetime or timedelta.") try: df = trj.resample(step_time).interpolate(method="spline", order=2) except ValueError as e: if len(e.args) > 0 and "cannot reindex from a duplicate axis" in e.args[0]: if errors == "coerce": logger.warning("Duplicate time indices, keeping first") trj = trj.loc[~trj.index.duplicated(keep="first")] df = ( trj.resample(step_time) .bfill(limit=1) .interpolate(method="spline", order=2) ) else: logger.error("Error: duplicate time indices") raise ValueError("Duplicate values in indices") return df
[docs] def resample_time(trj: TrajaDataFrame, step_time: str, new_fps: Optional[bool] = None): """Returns a ``TrajaDataFrame`` resampled to consistent `step_time` intervals. ``step_time`` should be expressed as a number-time unit combination, eg "2S" for 2 seconds and “2100L” for 2100 milliseconds. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory step_time (str): step time interval / offset string (eg, '2S' (seconds), '50L' (milliseconds), '50N' (nanoseconds)) new_fps (bool, optional): new fps Results: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory .. doctest:: >>> from traja import generate, resample_time >>> df = generate() >>> resampled = resample_time(df, '50L') # 50 milliseconds >>> resampled.head() # doctest: +NORMALIZE_WHITESPACE x y time 1970-01-01 00:00:00.000 0.000000 0.000000 1970-01-01 00:00:00.050 0.919113 4.022971 1970-01-01 00:00:00.100 -1.298510 5.423373 1970-01-01 00:00:00.150 -6.057524 4.708803 1970-01-01 00:00:00.200 -10.347759 2.108385 """ time_col = _get_time_col(trj) if time_col == "index" and is_datetime64_any_dtype(trj.index): _trj = _resample_time(trj, step_time) elif time_col == "index" and is_timedelta64_dtype(trj.index): trj.index = pd.to_datetime(trj.index) _trj = _resample_time(trj, step_time) _trj.index = pd.to_timedelta(_trj.index) elif time_col: if isinstance(step_time, str): try: if "." in step_time: raise NotImplementedError( """Fractional step time not implemented. For milliseconds/microseconds/nanoseconds use: L milliseonds U microseconds N nanoseconds eg, step_time='2100L'""" ) except Exception: raise NotImplementedError( f"Inferring from time format {step_time} not yet implemented." ) _trj = trj.set_index(time_col) time_units = _trj.__dict__.get("time_units", "s") _trj.index = pd.to_datetime(_trj.index, unit=time_units) _trj = _resample_time(_trj, step_time) else: raise NotImplementedError( f"Time column ({time_col}) not of expected dataset type." ) return _trj
def rotate(df, angle: Union[float, int] = 0, origin: tuple = None): """Returns a ``TrajaDataFrame`` Rotate a trajectory `angle` in radians. Args: trj (:class:`traja.frame.TrajaDataFrame`): Trajectory angle (float): angle in radians origin (tuple. optional): rotate around point (x,y) Returns: trj (:class:`traja.frame.TrajaDataFrame`): Trajectory .. note:: Based on Lyle Scott's `implementation <https://gist.github.com/LyleScott/e36e08bfb23b1f87af68c9051f985302>`_. """ trj = df.copy() # Calculate current orientation if isinstance(trj, traja.TrajaDataFrame): xy = df.traja.xy elif isinstance(trj, pd.DataFrame): trj = df[["x", "y"]] x, y = np.split(xy, [-1], axis=1) if origin is None: # Assume middle of x and y is origin origin = ((x.max() - x.min()) / 2, (y.max() - y.min()) / 2) offset_x, offset_y = origin new_coords = [] for x, y in xy: adjusted_x = x - offset_x adjusted_y = y - offset_y cos_rad = math.cos(angle) sin_rad = math.sin(angle) qx = offset_x + cos_rad * adjusted_x + sin_rad * adjusted_y qy = offset_y + -sin_rad * adjusted_x + cos_rad * adjusted_y new_coords.append((qx, qy)) new_xy = np.array(new_coords) x, y = np.split(new_xy, [-1], axis=1) trj["x"] = x trj["y"] = y return trj def rediscretize_points(trj: TrajaDataFrame, R: Union[float, int], time_out=False): """Returns a ``TrajaDataFrame`` rediscretized to a constant step length `R`. Args: trj (:class:`traja.frame.TrajaDataFrame`): Trajectory R (float): Rediscretized step length (eg, 0.02) time_out (bool): Include time corresponding to time intervals in output Returns: rt (:class:`numpy.ndarray`): rediscretized trajectory """ if not isinstance(R, (float, int)): raise TypeError(f"R should be float or int, but is {type(R)}") results = _rediscretize_points(trj, R, time_out) rt = results["rt"] if len(rt) < 2: raise RuntimeError( f"Step length {R} is too large for path (path length {len(trj)})" ) rt = traja.from_xy(rt) if time_out: rt["time"] = results["time"] return rt def _rediscretize_points( trj: TrajaDataFrame, R: Union[float, int], time_out=False ) -> dict: """Helper function for :func:`traja.trajectory.rediscretize`. Args: trj (:class:`traja.frame.TrajaDataFrame`): Trajectory R (float): Rediscretized step length (eg, 0.02) Returns: output (dict): Containing: result (:class:`numpy.ndarray`): Rediscretized coordinates time_vals (optional, list of floats or datetimes): Time points corresponding to result """ # TODO: Implement with complex numbers points = trj[["x", "y"]].dropna().values.astype("float64") n_points = len(points) result = np.empty((128, 2)) p0 = points[0] result[0] = p0 step_nr = 0 candidate_start = 1 # running index of candidate time_vals = [] if time_out: time_col = _get_time_col(trj) time = trj[time_col][0] time_vals.append(time) while candidate_start <= n_points: # Find the first point `curr_ind` for which |points[curr_ind] - p_0| >= R curr_ind = np.NaN for i in range( candidate_start, n_points ): # range of search space for next point d = np.linalg.norm(points[i] - result[step_nr]) if d >= R: curr_ind = i # curr_ind is in [candidate, n_points) if time_out: time = trj[time_col][i] time_vals.append(time) break if np.isnan(curr_ind): # End of path break # The next point may lie on the same segment candidate_start = curr_ind # The next point lies on the segment p[k-1], p[k] curr_result_x = result[step_nr][0] prev_x = points[curr_ind - 1, 0] curr_result_y = result[step_nr][1] prev_y = points[curr_ind - 1, 1] # a = 1 if points[k, 0] <= xk_1 else 0 lambda_ = np.arctan2( points[curr_ind, 1] - prev_y, points[curr_ind, 0] - prev_x ) # angle cos_l = np.cos(lambda_) sin_l = np.sin(lambda_) U = (curr_result_x - prev_x) * cos_l + (curr_result_y - prev_y) * sin_l V = (curr_result_y - prev_y) * cos_l - (curr_result_x - prev_x) * sin_l # Compute distance H between (X_{i+1}, Y_{i+1}) and (x_{k-1}, y_{k-1}) H = U + np.sqrt(abs(R**2 - V**2)) XIp1 = H * cos_l + prev_x YIp1 = H * sin_l + prev_y # Increase array size progressively to make the code run (significantly) faster if len(result) <= step_nr + 1: result = np.concatenate((result, np.empty_like(result))) # Save the point result[step_nr + 1] = np.array([XIp1, YIp1]) step_nr += 1 # Truncate result result = result[: step_nr + 1] output = {"rt": result} if time_out: output["time"] = time_vals return output def _has_cols(trj: TrajaDataFrame, cols: list): """Check if `trj` has `cols`.""" return set(cols).issubset(trj.columns) def calc_turn_angle(trj: TrajaDataFrame): """Return a ``Series`` of floats with turn angles. Args: trj (:class:`traja.frame.TrajaDataFrame`): Trajectory Returns: turn_angle (:class:`~pandas.Series`): Turn angle .. doctest:: >>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]}) >>> traja.calc_turn_angle(df) 0 NaN 1 NaN 2 0.0 Name: turn_angle, dtype: float64 """ if "heading" not in trj: heading = calc_heading(trj) else: heading = trj.heading turn_angle = heading.diff().rename("turn_angle") # Correction for 360-degree angle range turn_angle.loc[turn_angle >= 180] -= 360 turn_angle.loc[turn_angle < -180] += 360 return turn_angle
[docs] def calc_angle(trj: TrajaDataFrame, unit: str = "degrees", lag: int = 1): """Returns a ``Series`` with angle between steps as a function of displacement with regard to x axis. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory unit (str): return angle in radians or degrees (Default value: 'degrees') lag (int) : time steps between angle calculation (Default value: 1) Returns: angle (:class:`pandas.Series`): Angle series. """ if not _has_cols(trj, ["displacement"]) or (lag != 1): displacement = calc_displacement(trj, lag) else: displacement = trj.displacement if unit == "degrees": angle = np.rad2deg(np.arccos(np.abs(trj.x.diff(lag)) / displacement)) elif unit == "radians": angle = np.arccos(np.abs(trj.x.diff()) / displacement) else: raise ValueError(f"The unit {unit} is not valid.") angle.unit = unit angle.name = "angle" return angle
[docs] def calc_displacement(trj: TrajaDataFrame, lag=1): """Returns a ``Series`` of ``float`` displacement between consecutive indices. Supports both 2D and 3D trajectories. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory lag (int) : time steps between displacement calculation Returns: displacement (:class:`pandas.Series`): Displacement series. .. doctest:: >>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]}) >>> traja.calc_displacement(df) 0 NaN 1 1.414214 2 1.414214 Name: displacement, dtype: float64 """ # Support 3D trajectories if z column exists if "z" in trj.columns: displacement = np.sqrt( np.power(trj.x.shift(lag) - trj.x, 2) + np.power(trj.y.shift(lag) - trj.y, 2) + np.power(trj.z.shift(lag) - trj.z, 2) ) else: displacement = np.sqrt( np.power(trj.x.shift(lag) - trj.x, 2) + np.power(trj.y.shift(lag) - trj.y, 2) ) displacement.name = "displacement" return displacement
def calc_derivatives(trj: TrajaDataFrame): """Returns derivatives ``displacement`` and ``displacement_time`` as DataFrame. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory Returns: derivs (:class:`~pandas.DataFrame`): Derivatives. .. doctest:: >>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3],'time':[0., 0.2, 0.4]}) >>> traja.calc_derivatives(df) displacement displacement_time 0 NaN 0.0 1 1.414214 0.2 2 1.414214 0.4 """ time_col = _get_time_col(trj) if time_col is None: raise ValueError("Missing time information in trajectory.") if "displacement" not in trj: displacement = calc_displacement(trj) else: displacement = trj.displacement # get cumulative seconds if is_datetime64_any_dtype(trj[time_col]): displacement_time = ( trj[time_col].astype(int).div(10**9).diff().fillna(0).cumsum() ) else: try: displacement_time = trj[time_col].diff().fillna(0).cumsum() except TypeError: raise TypeError( f"Format (example {trj[time_col][0]}) not recognized as datetime" ) # TODO: Create DataFrame directly derivs = pd.DataFrame( OrderedDict(displacement=displacement, displacement_time=displacement_time) ) return derivs
[docs] def calc_heading(trj: TrajaDataFrame): """Calculate trajectory heading. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory Returns: heading (:class:`pandas.Series`): heading as a ``Series`` ..doctest:: >>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]}) >>> traja.calc_heading(df) 0 NaN 1 45.0 2 45.0 Name: heading, dtype: float64 """ if not _has_cols(trj, ["angle"]): angle = calc_angle(trj) else: angle = trj.angle if hasattr(angle, "unit"): if angle.unit == "radians": angle = np.rad2deg(angle) dx = trj.x.diff() dy = trj.y.diff() # Get heading from angle mask = (dx > 0) & (dy >= 0) trj.loc[mask, "heading"] = angle[mask] mask = (dx >= 0) & (dy < 0) trj.loc[mask, "heading"] = -angle[mask] mask = (dx < 0) & (dy <= 0) trj.loc[mask, "heading"] = -(180 - angle[mask]) mask = (dx <= 0) & (dy > 0) trj.loc[mask, "heading"] = 180 - angle[mask] return trj.heading
[docs] def speed_intervals( trj: TrajaDataFrame, faster_than: float = None, slower_than: float = None ) -> pd.DataFrame: """Calculate speed time intervals. Returns a dictionary of time intervals where speed is slower and/or faster than specified values. Args: faster_than (float, optional): Minimum speed threshold. (Default value = None) slower_than (float or int, optional): Maximum speed threshold. (Default value = None) Returns: result (:class:`~pd.DataFrame`) -- time intervals as dataframe .. note:: Implementation ported to Python, heavily inspired by Jim McLean's trajr package. .. doctest:: >> df = traja.generate() >> intervals = traja.speed_intervals(df, faster_than=100) >> intervals.head() start_frame start_time stop_frame stop_time duration 0 1 0.02 3 0.06 0.04 1 4 0.08 8 0.16 0.08 2 10 0.20 11 0.22 0.02 3 12 0.24 15 0.30 0.06 4 17 0.34 18 0.36 0.02 """ derivs = get_derivatives(trj) if faster_than is None and slower_than is None: raise ValueError( "Parameters faster_than and slower_than are both None, at least one must be provided." ) # Calculate trajectory speeds speed = derivs["speed"].values times = derivs["speed_times"].values times[0] = 0.0 flags = np.full(len(speed), 1) if faster_than is not None: flags = flags & (speed > faster_than) if slower_than is not None: flags = flags & (speed < slower_than) changes = np.diff(flags) stop_frames = np.where(changes == -1)[0] start_frames = np.where(changes == 1)[0] # Handle situation where interval begins or ends outside of trajectory if len(start_frames) > 0 or len(stop_frames) > 0: # Assume interval started at beginning of trajectory, since we don't know what happened before that if len(stop_frames) > 0 and ( len(start_frames) == 0 or stop_frames[0] < start_frames[0] ): start_frames = np.concatenate(([1], start_frames)) # Similarly, assume that interval can't extend past end of trajectory if ( len(stop_frames) == 0 or start_frames[len(start_frames) - 1] > stop_frames[len(stop_frames) - 1] ): stop_frames = np.concatenate((stop_frames, [len(speed) - 1])) stop_times = times[stop_frames] start_times = times[start_frames] durations = stop_times - start_times result = traja.TrajaDataFrame( OrderedDict( start_frame=start_frames, start_time=start_times, stop_frame=stop_frames, stop_time=stop_times, duration=durations, ) ) return result
[docs] def get_derivatives(trj: TrajaDataFrame): """Returns derivatives ``displacement``, ``displacement_time``, ``speed``, ``speed_times``, ``acceleration``, ``acceleration_times`` as dictionary. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory Returns: derivs (:class:`~pd.DataFrame`) : Derivatives .. doctest:: >> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3],'time':[0.,0.2,0.4]}) >> df.traja.get_derivatives() #doctest: +SKIP displacement displacement_time speed speed_times acceleration acceleration_times 0 NaN 0.0 NaN NaN NaN NaN 1 1.414214 0.2 7.071068 0.2 NaN NaN 2 1.414214 0.4 7.071068 0.4 0.0 0.4 """ if not _has_cols(trj, ["displacement", "displacement_time"]): derivs = calc_derivatives(trj) d = derivs["displacement"] t = derivs["displacement_time"] else: d = trj.displacement t = trj.displacement_time derivs = OrderedDict(displacement=d, displacement_time=t) if is_datetime_or_timedelta_dtype(t): # Convert to float divisible series # TODO: Add support for other time units t = t.dt.total_seconds() v = d[1 : len(d)] / t.diff() v = v.rename("speed") vt = t[1 : len(t)].rename("speed_times") # Calculate linear acceleration a = v.diff() / vt.diff() a = a.rename("acceleration") at = vt[1 : len(vt)].rename("accleration_times") data = dict(speed=v, speed_times=vt, acceleration=a, acceleration_times=at) derivs = derivs.merge(pd.DataFrame(data), left_index=True, right_index=True) # Replace infinite values derivs = derivs.replace([np.inf, -np.inf], np.nan) return derivs
def _get_xylim(trj: TrajaDataFrame) -> Tuple[Tuple, Tuple]: if ( "xlim" in trj.__dict__ and "ylim" in trj.__dict__ and isinstance(trj.xlim, (list, tuple)) ): return trj.xlim, trj.ylim else: xlim = trj.x.min(), trj.x.max() ylim = trj.y.min(), trj.y.max() return xlim, ylim def coords_to_flow(trj: TrajaDataFrame, bins: Union[int, tuple] = None): """Calculate grid cell flow from trajectory. Args: trj (trajectory) bins (int or tuple) Returns: X (:class:`~numpy.ndarray`): X coordinates of arrow locations Y (:class:`~numpy.ndarray`): Y coordinates of arrow locations U (:class:`~numpy.ndarray`): X component of vector dataset V (:class:`~numpy.ndarray`): Y component of vector dataset """ xlim, ylim = _get_xylim(trj) bins = _bins_to_tuple(trj, bins) X, Y = np.meshgrid( np.linspace(trj.x.min(), trj.x.max(), bins[0]), np.linspace(trj.y.min(), trj.y.max(), bins[1]), ) if "xbin" not in trj.columns or "ybin" not in trj.columns: grid_indices = traja.grid_coordinates(trj, bins=bins, xlim=xlim, ylim=ylim) else: grid_indices = trj[["xbin", "ybin"]] U, V = traja.calc_flow_angles(grid_indices.values) return X, Y, U, V def return_angle_to_point(p1: np.ndarray, p0: np.ndarray): """Calculate angle of points as coordinates in relation to each other. Designed to be broadcast across all trajectory points for a single origin point p0. Args: p1 (np.ndarray): Test point [x,y] p0 (np.ndarray): Origin/source point [x,y] Returns: r (float) """ r = math.degrees(math.atan2((p0[1] - p1[1]), (p0[0] - p1[0]))) return r def determine_colinearity(p0: np.ndarray, p1: np.ndarray, p2: np.ndarray): """Determine whether trio of points constitute a right turn, or whether they are left turns (or colinear/straight line). Args: p0 (:class:`~numpy.ndarray`): First point [x,y] in line p1 (:class:`~numpy.ndarray`): Second point [x,y] in line p2 (:class:`~numpy.ndarray`): Third point [x,y] in line Returns: (bool) """ cross_product = (p1[0] - p0[0]) * (p2[1] - p0[1]) - (p1[1] - p0[1]) * ( p2[0] - p0[0] ) if cross_product < 0: # Right turn return False else: # Points are colinear (if == 0) or left turn (if < 0) return True def inside( pt: np.ndarray, bounds_xs: list, bounds_ys: list, minx: float, maxx: float, miny: float, maxy: float, ): """Determine whether point lies inside or outside of polygon formed by "extrema" points - minx, maxx, miny, maxy. Optimized to be run as broadcast function in numpy along axis. Args: pt (:class:`~numpy.ndarray`): Point to test whether inside or outside polygon bounds_xs (list or tuple): x-coordinates of polygon vertices, in sequence bounds_ys (list or tuple): y-coordinates of polygon vertices, same sequence minx (float): minimum x coordinate value maxx (float): maximum x coordinate value miny (float): minimum y coordinate value maxy (float): maximum y coordinate value Returns: (bool) .. note:: Ported to Python from C implementation by W. Randolph Franklin (WRF): <https://wrf.ecse.rpi.edu/Research/Short_Notes/pnpoly.html> Boolean return "True" for OUTSIDE polygon, meaning it is within subset of possible convex hull coordinates. """ # Only theoretically possible, extrema polygon is actually a straight line if maxx == maxy and minx == miny: return True # No polygon to be within (considered outside) if pt[0] in [minx, maxx] or pt[1] in [miny, maxy]: return True # Extrema points are by definition part of convex hull poly_pts = len(bounds_xs) ct = 0 for i in range(poly_pts): if i == 0: j = poly_pts - 1 else: j = i - 1 # Test if horizontal trace from the point to infinity intersects the given polygon line segment if ((bounds_ys[i] > pt[1]) != (bounds_ys[j] > pt[1])) & ( pt[0] < ( (bounds_xs[j] - bounds_xs[i]) * (pt[1] - bounds_ys[i]) / (bounds_ys[j] - bounds_ys[i]) + bounds_xs[i] ) ): ct += 1 if ( ct % 2 == 0 ): # Number of intersections between point, polygon edge(s) and infinity point are odd: return True # Outside polygon else: return False # Inside polygon def calc_convex_hull(point_arr: np.array) -> np.array: """Identify containing polygonal convex hull for full Trajectory Interior points filtered with :meth:`traja.trajectory.inside` method, takes quadrilateral using extrema points `(minx, maxx, miny, maxy)` - convex hull points MUST all be outside such a polygon. Returns an array with all points in the convex hull. Implementation of Graham Scan `technique <https://en.wikipedia.org/wiki/Graham_scan>_`. Returns: point_arr (:class:`~numpy.ndarray`): n x 2 (x,y) array .. doctest:: >> #Quick visualizaation >> import matplotlib.pyplot as plt >> df = traja.generate(n=10000, convex_hull=True) >> xs, ys = [*zip(*df.convex_hull)] >> _ = plt.plot(df.x.values, df.y.values, 'o', 'blue') >> _ = plt.plot(xs, ys, '-o', color='red') >> _ = plt.show() .. note:: Incorporates Akl-Toussaint `method <http:/www-cgrl.cs.mcgill.ca/~godfried/publications/fast.convex.hull.algorithm.pdf>`_ for filtering interior points. .. note:: Performative loss beyond ~100,000-200,000 points, algorithm has O(nlogn) complexity. """ assert point_arr.shape[1] == 2, f"expected (n, 2) shape only, got {point_arr.shape}" # Find "extrema" points to form polygon (convex hull must be outside this polygon) minx = point_arr[:, 0].min() maxx = point_arr[:, 0].max() miny = point_arr[:, 1].min() maxy = point_arr[:, 1].max() min_x_pt = point_arr[np.where(point_arr[:, 0] == point_arr[:, 0].min())].tolist()[0] min_y_pt = point_arr[np.where(point_arr[:, 1] == point_arr[:, 1].min())].tolist()[0] max_x_pt = point_arr[np.where(point_arr[:, 0] == point_arr[:, 0].max())].tolist()[0] max_y_pt = point_arr[np.where(point_arr[:, 1] == point_arr[:, 1].max())].tolist()[0] extrema_pts = [min_x_pt, min_y_pt, max_x_pt, max_y_pt] extrema_xys = [*zip(*extrema_pts)] bounds_x, bounds_y = extrema_xys[0], extrema_xys[1] # Filter trajectory points to only include points "outside" of this extrema polygon convex_mask = np.apply_along_axis( inside, 1, point_arr, bounds_x, bounds_y, minx, maxx, miny, maxy ) point_arr = point_arr[convex_mask] # Find principal point (lowest y, lower x) from which to start p0 = point_arr[point_arr[:, 1] == point_arr[:, 1].min()].min(axis=0) point_arr = np.delete( point_arr, np.where((point_arr[:, 0] == p0[0]) & (point_arr[:, 1] == p0[1])), 0 ) # Sort remaining points point_arr = point_arr[np.lexsort((point_arr[:, 0], point_arr[:, 1]))] # Populate array with direction of each point in the trajectory to the principal (lowest, then leftmost) point point_arr_r_p0 = np.apply_along_axis(return_angle_to_point, 1, point_arr, p0) # Sort point array by radius sorted_ind = point_arr_r_p0.argsort() point_arr_r_p0 = point_arr_r_p0[sorted_ind] point_arr = point_arr[sorted_ind] # Check for points with duplicate angles from principal point, only keep furthest point unique_r = np.unique(point_arr_r_p0, return_index=True)[1] if ( unique_r.shape == point_arr_r_p0.shape ): # There are no two points at same angle from x axis pass else: point_arr_d_p0 = np.apply_along_axis( lambda x, p0=p0: np.linalg.norm(p0 - x), 1, point_arr ) # Identify duplicate angles unique, counts = np.unique(point_arr_r_p0, axis=0, return_counts=True) rep_angles = unique[counts > 1] duplicates = point_arr_r_p0[np.where(np.in1d(point_arr_r_p0, rep_angles))] duplicates = point_arr[np.where(np.in1d(point_arr_r_p0, rep_angles))] # Get indices of only the furthest point from origin at each unique angle dropped_pts = [] for dup_pt in duplicates: pt_idx = np.where( (point_arr[:, 0] == dup_pt[0]) & (point_arr[:, 1] == dup_pt[1]) )[0][0] r_val = point_arr_r_p0[pt_idx] ind_furthest = np.where( point_arr_d_p0 == point_arr_d_p0[np.where(point_arr_r_p0 == r_val)].max() )[0][0] if ( not pt_idx == ind_furthest ): # This is a "closer" point to origin, not in convex hull dropped_pts.append(pt_idx) point_arr = np.delete(point_arr, dropped_pts, axis=0) # Iterate through points. If a "right turn" is made, remove preceding point. point_arr = np.insert(point_arr, 0, p0, axis=0) for pt in point_arr: idx = np.where((point_arr[:, 0] == pt[0]) & (point_arr[:, 1] == pt[1]))[0][0] while True: # Skip/stop at first two points (3 points form line), after working backwards if idx <= 1: break # Continue working backwards until a left turn is made, or we reach the origin elif determine_colinearity( point_arr[idx - 2], point_arr[idx - 1], point_arr[idx] ): break else: # This is a right turn point_arr = np.delete(point_arr, idx - 1, 0) idx -= 1 point_arr = np.insert(point_arr, point_arr.shape[0], p0, axis=0) return point_arr def from_xy(xy: np.ndarray): """Convenience function for initializing :class:`~traja.frame.TrajaDataFrame` with x,y coordinates. Args: xy (:class:`numpy.ndarray`): x,y coordinates Returns: traj_df (:class:`~traja.frame.TrajaDataFrame`): Trajectory as dataframe .. doctest:: >>> import numpy as np >>> xy = np.array([[0,1],[1,2],[2,3]]) >>> traja.from_xy(xy) x y 0 0 1 1 1 2 2 2 3 """ df = traja.TrajaDataFrame.from_records(xy, columns=["x", "y"]) return df def fill_in_traj(trj: TrajaDataFrame): # FIXME: Implement return trj def _get_time_col(trj: TrajaDataFrame): # Check if saved in metadata time_col = trj.__dict__.get("time_col", None) if time_col: return time_col # Check if index is datetime if is_datetime64_any_dtype(trj.index) or is_datetime_or_timedelta_dtype(trj.index): return "index" # Check if any column contains 'time' time_cols = [col for col in trj if "time" in col.lower()] if time_cols: # Try first column time_col = time_cols[0] if is_datetime_or_timedelta_dtype(trj[time_col]): return time_col else: # Time column is float, etc. but not datetime64. # FIXME: Add conditional return, etc. return time_col else: # No time column found return None
[docs] def batch_process( trajectories: list, func: Callable, n_jobs: int = -1, **kwargs ) -> list: """Apply function to trajectories in parallel for performance. Args: trajectories (list): List of TrajaDataFrame trajectories func (Callable): Function to apply to each trajectory n_jobs (int): Number of parallel jobs. -1 means use all CPUs. Default -1. **kwargs: Additional arguments passed to func Returns: list: Results from applying func to each trajectory Example: >>> import traja >>> trajs = [traja.generate(n=100) for _ in range(100)] >>> # Normalize all trajectories in parallel >>> normalized = traja.trajectory.batch_process( ... trajs, ... lambda t: t.traja.normalize_trajectory() ... ) .. note:: Requires joblib for parallel processing: pip install joblib Falls back to sequential processing if joblib not available. """ try: from joblib import Parallel, delayed if n_jobs == -1: import multiprocessing n_jobs = multiprocessing.cpu_count() results = Parallel(n_jobs=n_jobs)( delayed(func)(traj, **kwargs) for traj in trajectories ) except ImportError: import warnings warnings.warn( "joblib not installed. Processing sequentially. " "Install joblib for parallel processing: pip install joblib" ) results = [func(traj, **kwargs) for traj in trajectories] return results
[docs] def train_test_split( trajectories: list, train_size: float = 0.7, val_size: float = 0.15, test_size: float = 0.15, shuffle: bool = True, random_state: Optional[int] = None, ) -> Tuple[list, list, list]: """Split trajectories into train, validation, and test sets for deep learning. Args: trajectories (list): List of TrajaDataFrame trajectories train_size (float): Proportion for training set. Default 0.7. val_size (float): Proportion for validation set. Default 0.15. test_size (float): Proportion for test set. Default 0.15. shuffle (bool): Whether to shuffle before splitting. Default True. random_state (int, optional): Random seed for reproducibility. Returns: tuple: (train_trajectories, val_trajectories, test_trajectories) Raises: ValueError: If sizes don't sum to 1.0 Example: >>> import traja >>> # Create sample trajectories >>> trajs = [traja.generate(n=100) for _ in range(50)] >>> train, val, test = traja.trajectory.train_test_split(trajs) >>> len(train), len(val), len(test) (35, 7, 8) .. note:: Essential for training and evaluating deep learning models on trajectory data. """ if not abs(train_size + val_size + test_size - 1.0) < 1e-6: raise ValueError( f"train_size ({train_size}) + val_size ({val_size}) + " f"test_size ({test_size}) must sum to 1.0" ) if not isinstance(trajectories, list): raise TypeError("trajectories must be a list of TrajaDataFrame objects") n = len(trajectories) if shuffle: if random_state is not None: np.random.seed(random_state) indices = np.random.permutation(n) else: indices = np.arange(n) # Calculate split points train_end = int(n * train_size) val_end = train_end + int(n * val_size) # Split indices train_indices = indices[:train_end] val_indices = indices[train_end:val_end] test_indices = indices[val_end:] # Create splits train_trajs = [trajectories[i] for i in train_indices] val_trajs = [trajectories[i] for i in val_indices] test_trajs = [trajectories[i] for i in test_indices] return train_trajs, val_trajs, test_trajs
if __name__ == "__main__": import doctest doctest.testmod()