Source code for traja.trajectory

import logging
import math
from collections import OrderedDict
from typing import Callable, Optional, Union, Tuple

import numpy as np
import pandas as pd
from pandas.core.dtypes.common import (
    is_datetime_or_timedelta_dtype,
    is_datetime64_any_dtype,
    is_timedelta64_dtype,
)
from scipy import signal
from scipy.spatial.distance import directed_hausdorff, euclidean

import traja
from traja import TrajaDataFrame

__all__ = [
    "_bins_to_tuple",
    "_get_time_col",
    "_get_xylim",
    "_grid_coords1D",
    "_has_cols",
    "_rediscretize_points",
    "_resample_time",
    "calc_angle",
    "calc_convex_hull",
    "calc_derivatives",
    "calc_displacement",
    "calc_heading",
    "calc_laterality",
    "calc_turn_angle",
    "calc_flow_angles",
    "cartesian_to_polar",
    "coords_to_flow",
    "determine_colinearity",
    "distance_between",
    "distance",
    "euclidean",
    "expected_sq_displacement",
    "fill_in_traj",
    "from_xy",
    "inside",
    "generate",
    "get_derivatives",
    "grid_coordinates",
    "length",
    "polar_to_z",
    "rediscretize_points",
    "resample_time",
    "return_angle_to_point",
    "rotate",
    "smooth_sg",
    "speed_intervals",
    "step_lengths",
    "to_shapely",
    "traj_from_coords",
    "transition_matrix",
    "transitions",
]

logger = logging.getLogger("traja")


[docs]def smooth_sg(trj: TrajaDataFrame, w: int = None, p: int = 3): """Returns ``DataFrame`` of trajectory after Savitzky-Golay filtering. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory w (int): window size (Default value = None) p (int): polynomial order (Default value = 3) Returns: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory .. doctest:: >> df = traja.generate() >> traja.smooth_sg(df, w=101).head() x y time 0 -11.194803 12.312742 0.00 1 -10.236337 10.613720 0.02 2 -9.309282 8.954952 0.04 3 -8.412910 7.335925 0.06 4 -7.546492 5.756128 0.08 """ if w is None: w = p + 3 - p % 2 if w % 2 != 1: raise Exception(f"Invalid smoothing parameter w ({w}): n must be odd") _trj = trj.copy() _trj.x = signal.savgol_filter(_trj.x, window_length=w, polyorder=p, axis=0) _trj.y = signal.savgol_filter(_trj.y, window_length=w, polyorder=p, axis=0) _trj = fill_in_traj(_trj) return _trj
def apply_all(trj: TrajaDataFrame, method: Callable, id_col: str, **kwargs): """Applies method to all trajectories""" return trj.groupby(by=id_col).apply(method, **kwargs) def step_lengths(trj: TrajaDataFrame): """Length of the steps of ``trj``. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory """ displacement = traja.trajectory.calc_displacement(trj) return displacement def polar_to_z(r: float, theta: float) -> complex: """Converts polar coordinates ``r`` and ``theta`` to complex number ``z``. Args: r (float): step size theta (float): angle Returns: z (complex): complex number z """ return r * np.exp(1j * theta) def cartesian_to_polar(xy: np.ndarray) -> (float, float): """Convert :class:`numpy.ndarray` ``xy`` to polar coordinates ``r`` and ``theta``. Args: xy (:class:`numpy.ndarray`): x,y coordinates Returns: r, theta (tuple of float): step-length and angle """ assert xy.ndim == 2, f"Dimensions are {xy.ndim}, expecting 2" x, y = np.split(xy, [-1], axis=1) x, y = np.squeeze(x), np.squeeze(y) r = np.sqrt(x * x + y * y) theta = np.arctan2(y, x) return r, theta
[docs]def distance(trj: TrajaDataFrame) -> float: """Calculates the distance from start to end of trajectory, also called net distance, displacement, or bee-line from start to finish. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory Returns: distance (float) .. doctest:: >> df = traja.generate() >> traja.distance(df) 117.01507823153617 """ start = trj.iloc[0][["x", "y"]].values end = trj.iloc[-1][["x", "y"]].values return np.linalg.norm(end - start)
[docs]def length(trj: TrajaDataFrame) -> float: """Calculates the cumulative length of a trajectory. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory Returns: length (float) .. doctest:: >> df = traja.generate() >> traja.length(df) 2001.142339606066 """ displacement = trj.traja.calc_displacement() return displacement.sum()
def expected_sq_displacement( trj: TrajaDataFrame, n: int = 0, eqn1: bool = True ) -> float: """Expected displacement. .. note:: This method is experimental and needs testing. """ sl = traja.step_lengths(trj) ta = traja.calc_angle(trj) l1 = np.mean(sl) l2 = np.mean(sl ** 2) c = np.mean(np.cos(ta)) s = np.mean(np.sin(ta)) s2 = s ** 2 if eqn1: # Eqn 1 alpha = np.arctan2(s, c) gamma = ((1 - c) ** 2 - s2) * np.cos((n + 1) * alpha) - 2 * s * ( 1 - c ) * np.sin((n + 1) * alpha) esd = ( n * l2 + 2 * l1 ** 2 * ((c - c ** 2 - s2) * n - c) / ((1 - c) ** 2 + s2) + 2 * l1 ** 2 * ((2 * s2 + (c + s2) ** ((n + 1) / 2)) / ((1 - c) ** 2 + s2) ** 2) * gamma ) return abs(esd) else: logger.info("This method is experimental and requires testing.") # Eqn 2 esd = n * l2 + 2 * l1 ** 2 * c / (1 - c) * (n - (1 - c ** n) / (1 - c)) return esd def traj_from_coords( track: Union[np.ndarray, pd.DataFrame], x_col=1, y_col=2, time_col: Optional[str] = None, fps: Union[float, int] = 4, spatial_units: str = "m", time_units: str = "s", ) -> TrajaDataFrame: """Create TrajaDataFrame from coordinates. Args: track: N x 2 numpy array or pandas DataFrame with x and y columns x_col: column index or x column name y_col: column index or y column name time_col: name of time column fps: Frames per seconds spatial_units: default m, optional time_units: default s, optional Returns: trj: TrajaDataFrame .. doctest:: >> xy = np.random.random((1000, 2)) >> trj = traja.traj_from_coord(xy) >> assert trj.shape == (1000,4) # columns x, y, time, dt """ if not isinstance(track, traja.TrajaDataFrame): if isinstance(track, np.ndarray) and track.shape[1] == 2: trj = traja.from_xy(track) elif isinstance(track, pd.DataFrame): trj = traja.TrajaDataFrame(track) else: trj = track trj.traja.spatial_units = spatial_units trj.traja.time_units = time_units def rename(col, name, trj): if isinstance(col, int): trj.rename(columns={col: name}) else: if col not in trj: raise Exception(f"Missing column {col}") trj.rename(columns={col: name}) return trj # Ensure column names are as expected trj = rename(x_col, "x", trj) trj = rename(y_col, "y", trj) if time_col is not None: trj = rename(time_col, "time", trj) # Allocate times if they aren't already known if "time" not in trj: if fps is None: raise Exception( ( "Cannot create a trajectory without times: either fps or a time column must be specified" ) ) # Assign times to each frame, starting at 0 trj["time"] = pd.Series(np.arange(0, len(trj)) / fps) # Get displacement time for each coordinate, with the first point at time 0 trj["dt"] = trj.time - trj.time.iloc[0] return trj def distance_between(A: traja.TrajaDataFrame, B: traja.TrajaDataFrame, method="dtw"): """Returns distance between two trajectories. Args: A (:class:`~traja.frame.TrajaDataFrame`) : Trajectory 1 B (:class:`~traja.frame.TrajaDataFrame`) : Trajectory 2 method (str): ``dtw`` for dynamic time warping, ``hausdorff`` for Hausdorff Returns: distance (float): Distance """ if method == "hausdorff": dist0 = directed_hausdorff(A, B)[0] dist1 = directed_hausdorff(B, A)[0] symmetric_dist = max(dist0, dist1) return symmetric_dist elif method == "dtw": try: from fastdtw import fastdtw except ImportError: raise ImportError( """ Missing optional dependency 'fastdtw'. Install fastdtw for dynamic time warping distance with pip install fastdtw. """ ) distance, path = fastdtw(A, B, dist=euclidean) return distance def to_shapely(trj): """Returns shapely object for area, bounds, etc. functions. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory Returns: shapely.geometry.linestring.LineString -- Shapely shape. .. doctest:: >>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]}) >>> shape = traja.to_shapely(df) >>> shape.is_closed False """ from shapely.geometry import shape coords = trj[["x", "y"]].values tracks_obj = {"type": "LineString", "coordinates": coords} tracks_shape = shape(tracks_obj) return tracks_shape def transition_matrix(grid_indices1D: np.ndarray): """Returns ``np.ndarray`` of Markov transition probability matrix for grid cell transitions. Args: grid_indices1D (:class:`np.ndarray`) Returns: M (:class:`numpy.ndarray`) """ if not isinstance(grid_indices1D, np.ndarray): raise TypeError(f"Expected np.ndarray, got {type(grid_indices1D)}") n = 1 + max(grid_indices1D.flatten()) # number of states M = [[0] * n for _ in range(n)] for (i, j) in zip(grid_indices1D, grid_indices1D[1:]): M[i][j] += 1 # Convert to probabilities for row in M: s = sum(row) if s > 0: row[:] = [f / s for f in row] return np.array(M) def _bins_to_tuple(trj, bins: Union[int, Tuple[int, int]] = 10): """Returns tuple of x, y bins Args: trj: Trajectory bins: The bin specification: If int, the number of bins for the smallest of the two dimensions such that (min(nx,ny)=bins). If [int, int], the number of bins in each dimension (nx, ny = bins). Returns: bins (Sequence[int,int]): Bins (nx, ny) """ if bins is None: bins = 10 if isinstance(bins, int): # make aspect equal xlim, ylim = _get_xylim(trj) aspect = (ylim[1] - ylim[0]) / (xlim[1] - xlim[0]) if aspect >= 1: bins = (bins, int(bins * aspect)) else: bins = (int(bins / aspect), bins) assert len(bins) == 2, f"bins should be length 2 but is {len(bins)}" return bins def calc_laterality( trj: TrajaDataFrame, dist_thresh: float, angle_thresh: float = 30, ): """Calculate laterality of a trajectory. Laterality is the preference for left or right turning. It is calculated with the number of left and right turns. Args: trj: Trajectory dist_thresh: distance for a step to count as a turn angle_thresh: angle threshold (from angle to 90 degrees) Returns: right_turns (int) left_turns (int) """ # get turn angle with regard to x axis if "turn_angle" not in trj.columns: turn_angle = calc_turn_angle(trj) else: turn_angle = trj.turn_agle distance = step_lengths(trj) distance_mask = distance > dist_thresh angle_mask = ((turn_angle > angle_thresh) & (turn_angle < 90)) | ( (turn_angle < -angle_thresh) & (turn_angle > -90) ) turns = turn_angle[distance_mask & angle_mask].dropna() left_turns = turns[turn_angle > 0].shape[0] right_turns = turns[turn_angle < 0].shape[0] return right_turns, left_turns def calc_flow_angles(grid_indices: np.ndarray): """Calculate average flow between grid indices.""" bins = (grid_indices[:, 0].max(), grid_indices[:, 1].max()) M = np.empty((bins[1], bins[0]), dtype=np.ndarray) for (i, j) in zip(grid_indices, grid_indices[1:]): # Account for fact that grid indices uses 1-base indexing ix = i[0] - 1 iy = i[1] - 1 jx = j[0] - 1 jy = j[1] - 1 if np.array_equal(i, j): angle = None elif ix == jx and iy > jy: # move towards y origin (down by default) angle = 3 * np.pi / 2 elif ix == jx and iy < jy: # move towards y origin (up by default) angle = np.pi / 2 elif ix < jx and iy == jy: # move right angle = 0 elif ix > jx and iy == jy: # move left angle = np.pi elif ix > jx and iy > jy: # move towards y origin (top left) angle = 3 * np.pi / 4 elif ix > jx and iy < jy: # move away from y origin (bottom left) angle = 5 * np.pi / 4 elif ix < jx and iy < jy: # move away from y origin (bottom right) angle = 7 * np.pi / 4 elif ix < jx and iy > jy: # move towards y origin (top right) angle = np.pi / 4 if angle is not None: M[iy, ix] = np.append(M[iy, ix], angle) U = np.ones_like(M) # x component of arrow V = np.empty_like(M) # y component of arrow for i, row in enumerate(M): for j, angles in enumerate(row): x = y = 0 # average_angle = None if angles is not None and len(angles) > 1: for angle in angles: if angle is None: continue x += np.cos(angle) y += np.sin(angle) # average_angle = np.arctan2(y, x) U[i, j] = x V[i, j] = y else: U[i, j] = 0 V[i, j] = 0 return U.astype(float), V.astype(float) def _grid_coords1D(grid_indices: np.ndarray): """Convert 2D grid indices to 1D indices.""" if isinstance(grid_indices, pd.DataFrame): grid_indices = grid_indices.values grid_indices1D = [] nr_cols = int(grid_indices[:, 0].max()) + 1 for coord in grid_indices: grid_indices1D.append( coord[1] * nr_cols + coord[0] ) # nr_rows * col_length + nr_cols return np.array(grid_indices1D, dtype=int) def transitions(trj: TrajaDataFrame, **kwargs): """Get first-order Markov model for transitions between grid cells. Args: trj (trajectory) kwargs: kwargs to :func:`traja.grid_coordinates` """ if "xbin" not in trj.columns or "ybin" not in trj.columns: grid_indices = grid_coordinates(trj, **kwargs) else: grid_indices = trj[["xbin", "ybin"]] # Drop nan for converting to int grid_indices.dropna(subset=["xbin", "ybin"], inplace=True) grid_indices1D = _grid_coords1D(grid_indices) transitions_matrix = transition_matrix(grid_indices1D) return transitions_matrix def grid_coordinates( trj: TrajaDataFrame, bins: Union[int, tuple] = None, xlim: tuple = None, ylim: tuple = None, assign: bool = False, ): """Returns ``DataFrame`` of trajectory discretized into 2D lattice grid coordinates. Args: trj (~`traja.frame.TrajaDataFrame`): Trajectory bins (tuple or int) xlim (tuple) ylim (tuple) assign (bool): Return updated original dataframe Returns: trj (~`traja.frame.TrajaDataFrame`): Trajectory is assign=True otherwise pd.DataFrame """ # Drop nan for converting to int trj.dropna(subset=["x", "y"], inplace=True) xmin = trj.x.min() if xlim is None else xlim[0] xmax = trj.x.max() if xlim is None else xlim[1] ymin = trj.y.min() if ylim is None else ylim[0] ymax = trj.y.max() if ylim is None else ylim[1] bins = _bins_to_tuple(trj, bins) if not xlim: xbin = pd.cut(trj.x, bins[0], labels=False) else: xmin, xmax = xlim xbinarray = np.linspace(xmin, xmax, bins[0]) xbin = np.digitize(trj.x, xbinarray) if not ylim: ybin = pd.cut(trj.y, bins[1], labels=False) else: ymin, ymax = ylim ybinarray = np.linspace(ymin, ymax, bins[1]) ybin = np.digitize(trj.y, ybinarray) if assign: trj["xbin"] = xbin trj["ybin"] = ybin return trj return pd.DataFrame({"xbin": xbin, "ybin": ybin})
[docs]def generate( n: int = 1000, random: bool = True, step_length: int = 2, angular_error_sd: float = 0.5, angular_error_dist: Callable = None, linear_error_sd: float = 0.2, linear_error_dist: Callable = None, fps: float = 50, spatial_units: str = "m", seed: int = None, convex_hull: bool = False, **kwargs, ): """Generates a trajectory. If ``random`` is ``True``, the trajectory will be a correlated random walk/idiothetic directed walk (Kareiva & Shigesada, 1983), corresponding to an animal navigating without a compass (Cheung, Zhang, Stricker, & Srinivasan, 2008). If ``random`` is ``False``, it will be(np.ndarray) a directed walk/allothetic directed walk/oriented path, corresponding to an animal navigating with a compass (Cheung, Zhang, Stricker, & Srinivasan, 2007, 2008). By default, for both random and directed walks, errors are normally distributed, unbiased, and independent of each other, so are **simple directed walks** in the terminology of Cheung, Zhang, Stricker, & Srinivasan, (2008). This behaviour may be modified by specifying alternative values for the ``angular_error_dist`` and/or ``linear_error_dist`` parameters. The initial angle (for a random walk) or the intended direction (for a directed walk) is ``0`` radians. The starting position is ``(0, 0)``. Args: n (int): (Default value = 1000) random (bool): (Default value = True) step_length: (Default value = 2) angular_error_sd (float): (Default value = 0.5) angular_error_dist (Callable): (Default value = None) linear_error_sd (float): (Default value = 0.2) linear_error_dist (Callable): (Default value = None) fps (float): (Default value = 50) convex_hull (bool): (Default value = False) spatial_units: (Default value = 'm') **kwargs: Additional arguments Returns: trj (:class:`traja.frame.TrajaDataFrame`): Trajectory .. note:: Based on Jim McLean's `trajr <https://github.com/JimMcL/trajr>`_, ported to Python. **Reference**: McLean, D. J., & Skowron Volponi, M. A. (2018). trajr: An R package for characterisation of animal trajectories. Ethology, 124(6), 440-448. https://doi.org/10.1111/eth.12739. """ if seed is None: np.random.seed(0) else: np.random.seed(seed) if angular_error_dist is None: angular_error_dist = np.random.normal( loc=0.0, scale=angular_error_sd, size=n - 1 ) if linear_error_dist is None: linear_error_dist = np.random.normal(loc=0.0, scale=linear_error_sd, size=n - 1) angular_errors = angular_error_dist linear_errors = linear_error_dist step_lengths = step_length + linear_errors # Don't allow negative lengths step_lengths[step_lengths < 0] = 0 steps = polar_to_z(step_lengths, angular_errors) if random: # Accumulate angular errors coords = np.zeros(n, dtype=np.complex) angle = 0 for i in range(n - 1): angle += angular_errors[i] length = step_length + linear_errors[i] coords[i + 1] = coords[i] + polar_to_z(r=length, theta=angle) else: coords = np.append(complex(0), np.cumsum(steps)) x = coords.real y = coords.imag df = traja.TrajaDataFrame(data={"x": x, "y": y}) if fps in (0, None): raise Exception("fps must be greater than 0") df.fps = fps time = df.index / fps df["time"] = time df.spatial_units = spatial_units for key, value in kwargs.items(): df.__dict__[key] = value # Update metavars metavars = dict(angular_error_sd=angular_error_sd, linear_error_sd=linear_error_sd) df.__dict__.update(metavars) # Attribute convex hull to dataframe if convex_hull: df.convex_hull = df[["x", "y"]].values else: del df.convex_hull return df
def _resample_time( trj: TrajaDataFrame, step_time: Union[float, int, str], errors="coerce" ): if not is_datetime_or_timedelta_dtype(trj.index): raise Exception(f"{trj.index.dtype} is not datetime or timedelta.") try: df = trj.resample(step_time).interpolate(method="spline", order=2) except ValueError as e: if len(e.args) > 0 and "cannot reindex from a duplicate axis" in e.args[0]: if errors == "coerce": logger.warning("Duplicate time indices, keeping first") trj = trj.loc[~trj.index.duplicated(keep="first")] df = ( trj.resample(step_time) .bfill(limit=1) .interpolate(method="spline", order=2) ) else: logger.error("Error: duplicate time indices") raise ValueError("Duplicate values in indices") return df
[docs]def resample_time(trj: TrajaDataFrame, step_time: str, new_fps: Optional[bool] = None): """Returns a ``TrajaDataFrame`` resampled to consistent `step_time` intervals. ``step_time`` should be expressed as a number-time unit combination, eg "2S" for 2 seconds and ā€œ2100Lā€ for 2100 milliseconds. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory step_time (str): step time interval / offset string (eg, '2S' (seconds), '50L' (milliseconds), '50N' (nanoseconds)) new_fps (bool, optional): new fps Results: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory .. doctest:: >>> from traja import generate, resample_time >>> df = generate() >>> resampled = resample_time(df, '50L') # 50 milliseconds >>> resampled.head() # doctest: +NORMALIZE_WHITESPACE x y time 1970-01-01 00:00:00.000 0.000000 0.000000 1970-01-01 00:00:00.050 0.919113 4.022971 1970-01-01 00:00:00.100 -1.298510 5.423373 1970-01-01 00:00:00.150 -6.057524 4.708803 1970-01-01 00:00:00.200 -10.347759 2.108385 """ time_col = _get_time_col(trj) if time_col == "index" and is_datetime64_any_dtype(trj.index): _trj = _resample_time(trj, step_time) elif time_col == "index" and is_timedelta64_dtype(trj.index): trj.index = pd.to_datetime(trj.index) _trj = _resample_time(trj, step_time) _trj.index = pd.to_timedelta(_trj.index) elif time_col: if isinstance(step_time, str): try: if "." in step_time: raise NotImplementedError( """Fractional step time not implemented. For milliseconds/microseconds/nanoseconds use: L milliseonds U microseconds N nanoseconds eg, step_time='2100L'""" ) except Exception: raise NotImplementedError( f"Inferring from time format {step_time} not yet implemented." ) _trj = trj.set_index(time_col) time_units = _trj.__dict__.get("time_units", "s") _trj.index = pd.to_datetime(_trj.index, unit=time_units) _trj = _resample_time(_trj, step_time) else: raise NotImplementedError( f"Time column ({time_col}) not of expected dataset type." ) return _trj
def rotate(df, angle: Union[float, int] = 0, origin: tuple = None): """Returns a ``TrajaDataFrame`` Rotate a trajectory `angle` in radians. Args: trj (:class:`traja.frame.TrajaDataFrame`): Trajectory angle (float): angle in radians origin (tuple. optional): rotate around point (x,y) Returns: trj (:class:`traja.frame.TrajaDataFrame`): Trajectory .. note:: Based on Lyle Scott's `implementation <https://gist.github.com/LyleScott/e36e08bfb23b1f87af68c9051f985302>`_. """ trj = df.copy() # Calculate current orientation if isinstance(trj, traja.TrajaDataFrame): xy = df.traja.xy elif isinstance(trj, pd.DataFrame): trj = df[["x", "y"]] x, y = np.split(xy, [-1], axis=1) if origin is None: # Assume middle of x and y is origin origin = ((x.max() - x.min()) / 2, (y.max() - y.min()) / 2) offset_x, offset_y = origin new_coords = [] for x, y in xy: adjusted_x = x - offset_x adjusted_y = y - offset_y cos_rad = math.cos(angle) sin_rad = math.sin(angle) qx = offset_x + cos_rad * adjusted_x + sin_rad * adjusted_y qy = offset_y + -sin_rad * adjusted_x + cos_rad * adjusted_y new_coords.append((qx, qy)) new_xy = np.array(new_coords) x, y = np.split(new_xy, [-1], axis=1) trj["x"] = x trj["y"] = y return trj def rediscretize_points(trj: TrajaDataFrame, R: Union[float, int], time_out=False): """Returns a ``TrajaDataFrame`` rediscretized to a constant step length `R`. Args: trj (:class:`traja.frame.TrajaDataFrame`): Trajectory R (float): Rediscretized step length (eg, 0.02) time_out (bool): Include time corresponding to time intervals in output Returns: rt (:class:`numpy.ndarray`): rediscretized trajectory """ if not isinstance(R, (float, int)): raise TypeError(f"R should be float or int, but is {type(R)}") results = _rediscretize_points(trj, R, time_out) rt = results["rt"] if len(rt) < 2: raise RuntimeError( f"Step length {R} is too large for path (path length {len(trj)})" ) rt = traja.from_xy(rt) if time_out: rt["time"] = results["time"] return rt def _rediscretize_points( trj: TrajaDataFrame, R: Union[float, int], time_out=False ) -> dict: """Helper function for :func:`traja.trajectory.rediscretize`. Args: trj (:class:`traja.frame.TrajaDataFrame`): Trajectory R (float): Rediscretized step length (eg, 0.02) Returns: output (dict): Containing: result (:class:`numpy.ndarray`): Rediscretized coordinates time_vals (optional, list of floats or datetimes): Time points corresponding to result """ # TODO: Implement with complex numbers points = trj[["x", "y"]].dropna().values.astype("float64") n_points = len(points) result = np.empty((128, 2)) p0 = points[0] result[0] = p0 step_nr = 0 candidate_start = 1 # running index of candidate time_vals = [] if time_out: time_col = _get_time_col(trj) time = trj[time_col][0] time_vals.append(time) while candidate_start <= n_points: # Find the first point `curr_ind` for which |points[curr_ind] - p_0| >= R curr_ind = np.NaN for i in range( candidate_start, n_points ): # range of search space for next point d = np.linalg.norm(points[i] - result[step_nr]) if d >= R: curr_ind = i # curr_ind is in [candidate, n_points) if time_out: time = trj[time_col][i] time_vals.append(time) break if np.isnan(curr_ind): # End of path break # The next point may lie on the same segment candidate_start = curr_ind # The next point lies on the segment p[k-1], p[k] curr_result_x = result[step_nr][0] prev_x = points[curr_ind - 1, 0] curr_result_y = result[step_nr][1] prev_y = points[curr_ind - 1, 1] # a = 1 if points[k, 0] <= xk_1 else 0 lambda_ = np.arctan2( points[curr_ind, 1] - prev_y, points[curr_ind, 0] - prev_x ) # angle cos_l = np.cos(lambda_) sin_l = np.sin(lambda_) U = (curr_result_x - prev_x) * cos_l + (curr_result_y - prev_y) * sin_l V = (curr_result_y - prev_y) * cos_l - (curr_result_x - prev_x) * sin_l # Compute distance H between (X_{i+1}, Y_{i+1}) and (x_{k-1}, y_{k-1}) H = U + np.sqrt(abs(R ** 2 - V ** 2)) XIp1 = H * cos_l + prev_x YIp1 = H * sin_l + prev_y # Increase array size progressively to make the code run (significantly) faster if len(result) <= step_nr + 1: result = np.concatenate((result, np.empty_like(result))) # Save the point result[step_nr + 1] = np.array([XIp1, YIp1]) step_nr += 1 # Truncate result result = result[: step_nr + 1] output = {"rt": result} if time_out: output["time"] = time_vals return output def _has_cols(trj: TrajaDataFrame, cols: list): """Check if `trj` has `cols`.""" return set(cols).issubset(trj.columns) def calc_turn_angle(trj: TrajaDataFrame): """Return a ``Series`` of floats with turn angles. Args: trj (:class:`traja.frame.TrajaDataFrame`): Trajectory Returns: turn_angle (:class:`~pandas.Series`): Turn angle .. doctest:: >>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]}) >>> traja.calc_turn_angle(df) 0 NaN 1 NaN 2 0.0 Name: turn_angle, dtype: float64 """ if "heading" not in trj: heading = calc_heading(trj) else: heading = trj.heading turn_angle = heading.diff().rename("turn_angle") # Correction for 360-degree angle range turn_angle.loc[turn_angle >= 180] -= 360 turn_angle.loc[turn_angle < -180] += 360 return turn_angle
[docs]def calc_angle(trj: TrajaDataFrame, unit: str = "degrees", lag: int = 1): """Returns a ``Series`` with angle between steps as a function of displacement with regard to x axis. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory unit (str): return angle in radians or degrees (Default value: 'degrees') lag (int) : time steps between angle calculation (Default value: 1) Returns: angle (:class:`pandas.Series`): Angle series. """ if not _has_cols(trj, ["displacement"]) or (lag != 1): displacement = calc_displacement(trj, lag) else: displacement = trj.displacement if unit == "degrees": angle = np.rad2deg(np.arccos(np.abs(trj.x.diff(lag)) / displacement)) elif unit == "radians": angle = np.arccos(np.abs(trj.x.diff()) / displacement) else: raise ValueError(f"The unit {unit} is not valid.") angle.unit = unit angle.name = "angle" return angle
[docs]def calc_displacement(trj: TrajaDataFrame, lag=1): """Returns a ``Series`` of ``float`` displacement between consecutive indices. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory lag (int) : time steps between displacement calculation Returns: displacement (:class:`pandas.Series`): Displacement series. .. doctest:: >>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]}) >>> traja.calc_displacement(df) 0 NaN 1 1.414214 2 1.414214 Name: displacement, dtype: float64 """ displacement = np.sqrt( np.power(trj.x.shift(lag) - trj.x, 2) + np.power(trj.y.shift(lag) - trj.y, 2) ) displacement.name = "displacement" return displacement
def calc_derivatives(trj: TrajaDataFrame): """Returns derivatives ``displacement`` and ``displacement_time`` as DataFrame. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory Returns: derivs (:class:`~pandas.DataFrame`): Derivatives. .. doctest:: >>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3],'time':[0., 0.2, 0.4]}) >>> traja.calc_derivatives(df) displacement displacement_time 0 NaN 0.0 1 1.414214 0.2 2 1.414214 0.4 """ time_col = _get_time_col(trj) if time_col is None: raise Exception("Missing time information in trajectory.") if "displacement" not in trj: displacement = calc_displacement(trj) else: displacement = trj.displacement # get cumulative seconds if is_datetime64_any_dtype(trj[time_col]): displacement_time = ( trj[time_col].astype(int).div(10 ** 9).diff().fillna(0).cumsum() ) else: try: displacement_time = trj[time_col].diff().fillna(0).cumsum() except TypeError: raise Exception( f"Format (example {trj[time_col][0]}) not recognized as datetime" ) # TODO: Create DataFrame directly derivs = pd.DataFrame( OrderedDict(displacement=displacement, displacement_time=displacement_time) ) return derivs
[docs]def calc_heading(trj: TrajaDataFrame): """Calculate trajectory heading. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory Returns: heading (:class:`pandas.Series`): heading as a ``Series`` ..doctest:: >>> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3]}) >>> traja.calc_heading(df) 0 NaN 1 45.0 2 45.0 Name: heading, dtype: float64 """ if not _has_cols(trj, ["angle"]): angle = calc_angle(trj) else: angle = trj.angle if hasattr(angle, "unit"): if angle.unit == "radians": angle = np.rad2deg(angle) dx = trj.x.diff() dy = trj.y.diff() # Get heading from angle mask = (dx > 0) & (dy >= 0) trj.loc[mask, "heading"] = angle[mask] mask = (dx >= 0) & (dy < 0) trj.loc[mask, "heading"] = -angle[mask] mask = (dx < 0) & (dy <= 0) trj.loc[mask, "heading"] = -(180 - angle[mask]) mask = (dx <= 0) & (dy > 0) trj.loc[mask, "heading"] = 180 - angle[mask] return trj.heading
[docs]def speed_intervals( trj: TrajaDataFrame, faster_than: float = None, slower_than: float = None ) -> pd.DataFrame: """Calculate speed time intervals. Returns a dictionary of time intervals where speed is slower and/or faster than specified values. Args: faster_than (float, optional): Minimum speed threshold. (Default value = None) slower_than (float or int, optional): Maximum speed threshold. (Default value = None) Returns: result (:class:`~pd.DataFrame`) -- time intervals as dataframe .. note:: Implementation ported to Python, heavily inspired by Jim McLean's trajr package. .. doctest:: >> df = traja.generate() >> intervals = traja.speed_intervals(df, faster_than=100) >> intervals.head() start_frame start_time stop_frame stop_time duration 0 1 0.02 3 0.06 0.04 1 4 0.08 8 0.16 0.08 2 10 0.20 11 0.22 0.02 3 12 0.24 15 0.30 0.06 4 17 0.34 18 0.36 0.02 """ derivs = get_derivatives(trj) if faster_than is None and slower_than is None: raise Exception( "Parameters faster_than and slower_than are both None, at least one must be provided." ) # Calculate trajectory speeds speed = derivs["speed"].values times = derivs["speed_times"].values times[0] = 0.0 flags = np.full(len(speed), 1) if faster_than is not None: flags = flags & (speed > faster_than) if slower_than is not None: flags = flags & (speed < slower_than) changes = np.diff(flags) stop_frames = np.where(changes == -1)[0] start_frames = np.where(changes == 1)[0] # Handle situation where interval begins or ends outside of trajectory if len(start_frames) > 0 or len(stop_frames) > 0: # Assume interval started at beginning of trajectory, since we don't know what happened before that if len(stop_frames) > 0 and ( len(start_frames) == 0 or stop_frames[0] < start_frames[0] ): start_frames = np.append(1, start_frames) # Similarly, assume that interval can't extend past end of trajectory if ( len(stop_frames) == 0 or start_frames[len(start_frames) - 1] > stop_frames[len(stop_frames) - 1] ): stop_frames = np.append(stop_frames, len(speed) - 1) stop_times = times[stop_frames] start_times = times[start_frames] durations = stop_times - start_times result = traja.TrajaDataFrame( OrderedDict( start_frame=start_frames, start_time=start_times, stop_frame=stop_frames, stop_time=stop_times, duration=durations, ) ) return result
[docs]def get_derivatives(trj: TrajaDataFrame): """Returns derivatives ``displacement``, ``displacement_time``, ``speed``, ``speed_times``, ``acceleration``, ``acceleration_times`` as dictionary. Args: trj (:class:`~traja.frame.TrajaDataFrame`): Trajectory Returns: derivs (:class:`~pd.DataFrame`) : Derivatives .. doctest:: >> df = traja.TrajaDataFrame({'x':[0,1,2],'y':[1,2,3],'time':[0.,0.2,0.4]}) >> df.traja.get_derivatives() #doctest: +SKIP displacement displacement_time speed speed_times acceleration acceleration_times 0 NaN 0.0 NaN NaN NaN NaN 1 1.414214 0.2 7.071068 0.2 NaN NaN 2 1.414214 0.4 7.071068 0.4 0.0 0.4 """ if not _has_cols(trj, ["displacement", "displacement_time"]): derivs = calc_derivatives(trj) d = derivs["displacement"] t = derivs["displacement_time"] else: d = trj.displacement t = trj.displacement_time derivs = OrderedDict(displacement=d, displacement_time=t) if is_datetime_or_timedelta_dtype(t): # Convert to float divisible series # TODO: Add support for other time units t = t.dt.total_seconds() v = d[1 : len(d)] / t.diff() v.rename("speed") vt = t[1 : len(t)].rename("speed_times") # Calculate linear acceleration a = v.diff() / vt.diff().rename("acceleration") at = vt[1 : len(vt)].rename("accleration_times") data = dict(speed=v, speed_times=vt, acceleration=a, acceleration_times=at) derivs = derivs.merge(pd.DataFrame(data), left_index=True, right_index=True) # Replace infinite values derivs.replace([np.inf, -np.inf], np.nan) return derivs
def _get_xylim(trj: TrajaDataFrame) -> Tuple[Tuple, Tuple]: if ( "xlim" in trj.__dict__ and "ylim" in trj.__dict__ and isinstance(trj.xlim, (list, tuple)) ): return trj.xlim, trj.ylim else: xlim = trj.x.min(), trj.x.max() ylim = trj.y.min(), trj.y.max() return xlim, ylim def coords_to_flow(trj: TrajaDataFrame, bins: Union[int, tuple] = None): """Calculate grid cell flow from trajectory. Args: trj (trajectory) bins (int or tuple) Returns: X (:class:`~numpy.ndarray`): X coordinates of arrow locations Y (:class:`~numpy.ndarray`): Y coordinates of arrow locations U (:class:`~numpy.ndarray`): X component of vector dataset V (:class:`~numpy.ndarray`): Y component of vector dataset """ xlim, ylim = _get_xylim(trj) bins = _bins_to_tuple(trj, bins) X, Y = np.meshgrid( np.linspace(trj.x.min(), trj.x.max(), bins[0]), np.linspace(trj.y.min(), trj.y.max(), bins[1]), ) if "xbin" not in trj.columns or "ybin" not in trj.columns: grid_indices = traja.grid_coordinates(trj, bins=bins, xlim=xlim, ylim=ylim) else: grid_indices = trj[["xbin", "ybin"]] U, V = traja.calc_flow_angles(grid_indices.values) return X, Y, U, V def return_angle_to_point(p1: np.ndarray, p0: np.ndarray): """Calculate angle of points as coordinates in relation to each other. Designed to be broadcast across all trajectory points for a single origin point p0. Args: p1 (np.ndarray): Test point [x,y] p0 (np.ndarray): Origin/source point [x,y] Returns: r (float) """ r = math.degrees(math.atan2((p0[1] - p1[1]), (p0[0] - p1[0]))) return r def determine_colinearity(p0: np.ndarray, p1: np.ndarray, p2: np.ndarray): """Determine whether trio of points constitute a right turn, or whether they are left turns (or colinear/straight line). Args: p0 (:class:`~numpy.ndarray`): First point [x,y] in line p1 (:class:`~numpy.ndarray`): Second point [x,y] in line p2 (:class:`~numpy.ndarray`): Third point [x,y] in line Returns: (bool) """ cross_product = (p1[0] - p0[0]) * (p2[1] - p0[1]) - (p1[1] - p0[1]) * ( p2[0] - p0[0] ) if cross_product < 0: # Right turn return False else: # Points are colinear (if == 0) or left turn (if < 0) return True def inside( pt: np.ndarray, bounds_xs: list, bounds_ys: list, minx: float, maxx: float, miny: float, maxy: float, ): """Determine whether point lies inside or outside of polygon formed by "extrema" points - minx, maxx, miny, maxy. Optimized to be run as broadcast function in numpy along axis. Args: pt (:class:`~numpy.ndarray`): Point to test whether inside or outside polygon bounds_xs (list or tuple): x-coordinates of polygon vertices, in sequence bounds_ys (list or tuple): y-coordinates of polygon vertices, same sequence minx (float): minimum x coordinate value maxx (float): maximum x coordinate value miny (float): minimum y coordinate value maxy (float): maximum y coordinate value Returns: (bool) .. note:: Ported to Python from C implementation by W. Randolph Franklin (WRF): <https://wrf.ecse.rpi.edu/Research/Short_Notes/pnpoly.html> Boolean return "True" for OUTSIDE polygon, meaning it is within subset of possible convex hull coordinates. """ # Only theoretically possible, extrema polygon is actually a straight line if maxx == maxy and minx == miny: return True # No polygon to be within (considered outside) if pt[0] in [minx, maxx] or pt[1] in [miny, maxy]: return True # Extrema points are by definition part of convex hull poly_pts = len(bounds_xs) ct = 0 for i in range(poly_pts): if i == 0: j = poly_pts - 1 else: j = i - 1 # Test if horizontal trace from the point to infinity intersects the given polygon line segment if ((bounds_ys[i] > pt[1]) != (bounds_ys[j] > pt[1])) & ( pt[0] < ( (bounds_xs[j] - bounds_xs[i]) * (pt[1] - bounds_ys[i]) / (bounds_ys[j] - bounds_ys[i]) + bounds_xs[i] ) ): ct += 1 if ( ct % 2 == 0 ): # Number of intersections between point, polygon edge(s) and infinity point are odd: return True # Outside polygon else: return False # Inside polygon def calc_convex_hull(point_arr: np.array) -> np.array: """Identify containing polygonal convex hull for full Trajectory Interior points filtered with :meth:`traja.trajectory.inside` method, takes quadrilateral using extrema points `(minx, maxx, miny, maxy)` - convex hull points MUST all be outside such a polygon. Returns an array with all points in the convex hull. Implementation of Graham Scan `technique <https://en.wikipedia.org/wiki/Graham_scan>_`. Returns: point_arr (:class:`~numpy.ndarray`): n x 2 (x,y) array .. doctest:: >> #Quick visualizaation >> import matplotlib.pyplot as plt >> df = traja.generate(n=10000, convex_hull=True) >> xs, ys = [*zip(*df.convex_hull)] >> _ = plt.plot(df.x.values, df.y.values, 'o', 'blue') >> _ = plt.plot(xs, ys, '-o', color='red') >> _ = plt.show() .. note:: Incorporates Akl-Toussaint `method <http:/www-cgrl.cs.mcgill.ca/~godfried/publications/fast.convex.hull.algorithm.pdf>`_ for filtering interior points. .. note:: Performative loss beyond ~100,000-200,000 points, algorithm has O(nlogn) complexity. """ assert point_arr.shape[1] == 2, f"expected (n, 2) shape only, got {point_arr.shape}" # Find "extrema" points to form polygon (convex hull must be outside this polygon) minx = point_arr[:, 0].min() maxx = point_arr[:, 0].max() miny = point_arr[:, 1].min() maxy = point_arr[:, 1].max() min_x_pt = point_arr[np.where(point_arr[:, 0] == point_arr[:, 0].min())].tolist()[0] min_y_pt = point_arr[np.where(point_arr[:, 1] == point_arr[:, 1].min())].tolist()[0] max_x_pt = point_arr[np.where(point_arr[:, 0] == point_arr[:, 0].max())].tolist()[0] max_y_pt = point_arr[np.where(point_arr[:, 1] == point_arr[:, 1].max())].tolist()[0] extrema_pts = [min_x_pt, min_y_pt, max_x_pt, max_y_pt] extrema_xys = [*zip(*extrema_pts)] bounds_x, bounds_y = extrema_xys[0], extrema_xys[1] # Filter trajectory points to only include points "outside" of this extrema polygon convex_mask = np.apply_along_axis( inside, 1, point_arr, bounds_x, bounds_y, minx, maxx, miny, maxy ) point_arr = point_arr[convex_mask] # Find principal point (lowest y, lower x) from which to start p0 = point_arr[point_arr[:, 1] == point_arr[:, 1].min()].min(axis=0) point_arr = np.delete( point_arr, np.where((point_arr[:, 0] == p0[0]) & (point_arr[:, 1] == p0[1])), 0 ) # Sort remaining points point_arr = point_arr[np.lexsort((point_arr[:, 0], point_arr[:, 1]))] # Populate array with direction of each point in the trajectory to the principal (lowest, then leftmost) point point_arr_r_p0 = np.apply_along_axis(return_angle_to_point, 1, point_arr, p0) # Sort point array by radius sorted_ind = point_arr_r_p0.argsort() point_arr_r_p0 = point_arr_r_p0[sorted_ind] point_arr = point_arr[sorted_ind] # Check for points with duplicate angles from principal point, only keep furthest point unique_r = np.unique(point_arr_r_p0, return_index=True)[1] if ( unique_r.shape == point_arr_r_p0.shape ): # There are no two points at same angle from x axis pass else: point_arr_d_p0 = np.apply_along_axis( lambda x, p0=p0: np.linalg.norm(p0 - x), 1, point_arr ) # Identify duplicate angles unique, counts = np.unique(point_arr_r_p0, axis=0, return_counts=True) rep_angles = unique[counts > 1] duplicates = point_arr_r_p0[np.where(np.in1d(point_arr_r_p0, rep_angles))] duplicates = point_arr[np.where(np.in1d(point_arr_r_p0, rep_angles))] # Get indices of only the furthest point from origin at each unique angle dropped_pts = [] for dup_pt in duplicates: pt_idx = np.where( (point_arr[:, 0] == dup_pt[0]) & (point_arr[:, 1] == dup_pt[1]) )[0][0] r_val = point_arr_r_p0[pt_idx] ind_furthest = np.where( point_arr_d_p0 == point_arr_d_p0[np.where(point_arr_r_p0 == r_val)].max() )[0][0] if ( not pt_idx == ind_furthest ): # This is a "closer" point to origin, not in convex hull dropped_pts.append(pt_idx) point_arr = np.delete(point_arr, dropped_pts, axis=0) # Iterate through points. If a "right turn" is made, remove preceding point. point_arr = np.insert(point_arr, 0, p0, axis=0) for pt in point_arr: idx = np.where((point_arr[:, 0] == pt[0]) & (point_arr[:, 1] == pt[1]))[0][0] while True: # Skip/stop at first two points (3 points form line), after working backwards if idx <= 1: break # Continue working backwards until a left turn is made, or we reach the origin elif determine_colinearity( point_arr[idx - 2], point_arr[idx - 1], point_arr[idx] ): break else: # This is a right turn point_arr = np.delete(point_arr, idx - 1, 0) idx -= 1 point_arr = np.insert(point_arr, point_arr.shape[0], p0, axis=0) return point_arr def from_xy(xy: np.ndarray): """Convenience function for initializing :class:`~traja.frame.TrajaDataFrame` with x,y coordinates. Args: xy (:class:`numpy.ndarray`): x,y coordinates Returns: traj_df (:class:`~traja.frame.TrajaDataFrame`): Trajectory as dataframe .. doctest:: >>> import numpy as np >>> xy = np.array([[0,1],[1,2],[2,3]]) >>> traja.from_xy(xy) x y 0 0 1 1 1 2 2 2 3 """ df = traja.TrajaDataFrame.from_records(xy, columns=["x", "y"]) return df def fill_in_traj(trj: TrajaDataFrame): # FIXME: Implement return trj def _get_time_col(trj: TrajaDataFrame): # Check if saved in metadata time_col = trj.__dict__.get("time_col", None) if time_col: return time_col # Check if index is datetime if is_datetime64_any_dtype(trj.index) or is_datetime_or_timedelta_dtype(trj.index): return "index" # Check if any column contains 'time' time_cols = [col for col in trj if "time" in col.lower()] if time_cols: # Try first column time_col = time_cols[0] if is_datetime_or_timedelta_dtype(trj[time_col]): return time_col else: # Time column is float, etc. but not datetime64. # FIXME: Add conditional return, etc. return time_col else: # No time column found return None if __name__ == "__main__": import doctest doctest.testmod()