Coverage for navipy/trajectories/__init__.py : 34%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
""" Trajectory in navipy """
triangle_mode, euler_axes)
triangle_mode, euler_axes)
homogeneous_markers, rotation_mode): row = trajectory.loc[index_i] angles = row.loc[rotation_mode].values translate = row.loc['location'].values trans_mat = htf.compose_matrix(angles=angles, translate=translate, axes=rotation_mode) tmarker = trans_mat.dot(homogeneous_markers) tmarker = pd.DataFrame(data=tmarker, index=homogeneous_markers.index, columns=homogeneous_markers.columns) # We do not need w tmarker = tmarker.loc[['x', 'y', 'z'], :].unstack() tmarker.name = index_i return tmarker
homogeneous_markers, rotation_mode): row = trajectory.loc[index_i] angles = row.loc[rotation_mode].values translate = row.loc['location'].values trans_mat = htf.compose_matrix(angles=angles, translate=translate, axes=rotation_mode) tmarker = np.linalg.inv(trans_mat).dot(homogeneous_markers) tmarker = pd.DataFrame(data=tmarker, index=homogeneous_markers.index, columns=homogeneous_markers.columns) # We do not need w tmarker = tmarker.loc[['x', 'y', 'z'], :].unstack() tmarker.name = index_i return tmarker
index = pd.MultiIndex.from_tuples( [('location', 'x'), ('location', 'y'), ('location', 'z'), (rotconv, 'q_0'), (rotconv, 'q_1'), (rotconv, 'q_2'), (rotconv, 'q_3')]) [('location', 'x'), ('location', 'y'), ('location', 'z'), (rotconv, 'alpha_0'), (rotconv, 'alpha_1'), (rotconv, 'alpha_2')]) else: msg = 'convention for rotation {} is not suppored\n' msg += msg.format(rotconv) msg += 'the following convention are supported\n:' for rconv in mconst._AXES2TUPLE.keys(): msg += '{}\n'.format(rconv) msg += 'quaternion\n' raise KeyError(msg)
def rotation_mode(self):
def rotation_mode(self, rotation_mode): """Convert current rotation_mode to a different one
:param rotation_mode: the new rotation mode to be assigned """ columns=self.__build_columns(rotation_mode), dtype=float) orient = htq.from_euler(row.angle_0, row.angle_1, row.angle_2, axes=oldrotmod) else: aj=row.loc[(self.rotation_mode, 'alpha_1')], ak=row.loc[(self.rotation_mode, 'alpha_2')], axes=oldrotmod)
""" Return facing vector """ facing = pd.DataFrame(index=self.index, columns=['x', 'y', 'z'], dtype=float) for i, row in self.iterrows(): if self.rotation_mode == 'quaternion': mat = htq.matrix(row.loc[self.rotation_mode].values) else: mat = hte.matrix(row.loc[(self.rotation_mode, 'alpha_0')], row.loc[(self.rotation_mode, 'alpha_1')], row.loc[(self.rotation_mode, 'alpha_2')], axes=self.rotation_mode)[:3, :3] orient = np.dot(mat, np.array([[1], [0], [0]]))[:, 0] facing.loc[i, ['x', 'y', 'z']] = orient return facing
def sampling_rate(self):
def sampling_rate(self, sampling_rate):
def x(self):
def x(self, x):
def y(self):
def y(self, y):
def z(self):
def z(self, z):
else: msg = 'alpha_{0:} does not exist for quaternion (try q_{0:})' raise ValueError(msg.format(alphai))
else: msg = 'alpha_{0:} does not exist for quaternion (try q_{0:})' raise ValueError(msg.format(alphai))
def alpha_0(self):
def alpha_0(self, alpha_0):
def alpha_1(self):
def alpha_1(self, alpha_1):
def alpha_2(self):
def alpha_2(self, alpha_2):
if self.__rotconv == 'quaternion': return self.loc[:, (self.__rotconv, 'q_{}'.format(qi))] else: msg = 'q_{0:} does not exist for none quaternion (try alpha_{0:})' raise ValueError(msg.format(qi))
if self.__rotconv != 'quaternion': self.loc[:, (self.__rotconv, 'q_{}'.format(qi))] = val else: msg = 'q_{0:} does not exist for none quaternion (try alpha_{0:})' raise ValueError(msg.format(qi))
def q_0(self): return self.__get_q_i(0)
def q_0(self, q_0): self.__set_q_i(0, q_0)
def q_1(self): return self.__get_q_i(1)
def q_1(self, q_1): self.__set_q_i(1, q_1)
def q_2(self): return self.__get_q_i(2)
def q_2(self, q_2): self.__set_q_i(2, q_2)
def q_3(self): return self.__get_q_i(3)
def q_3(self, q_3): self.__set_q_i(3, q_3)
# ------------------------------------------- # ---------------- IO ----------------------- # ------------------------------------------- """ Load from a hdf file """ df = pd.read_csv(filename, sep=sep, header=[0, 1], index_col=0) self.from_dataframe(df) return self
df = pd.read_hdf(filename) self.from_dataframe(df) return self
df = pd.DataFrame(self) df.to_hdf(filename, key='posorients')
df = pd.DataFrame(self) df.to_csv(filename)
# ------------------------------------------- # ---------------- INITS FROM VAR------------ # ------------------------------------------- """ Assign trajectory from a numpy array N x 6 (rotconv = Euler angles) N x 7 (rotconv = quaternion) """ # Check user input if not isinstance(nparray, np.ndarray): msg = 'nparray should be a np.ndarray and not {}' msg = msg.format(type(nparray)) raise TypeError(msg) if indeces is None: indeces = np.arange(0, nparray.shape[0]) if not isinstance(indeces, np.ndarray): msg = 'indeces should be a np.ndarray and not {}' msg = msg.format(type(indeces)) raise TypeError(msg) if indeces.shape[0] != nparray.shape[0]: msg = 'indeces and nparray should have same number of rows' msg += '{}!={}' msg = msg.format(indeces.shape[0], nparray.shape[0]) raise TypeError(msg) if rotconv == 'quaternion': if nparray.shape[1] != 7: msg = 'nparray should have size Nx7 and not {}' msg = msg.format(nparray.shape) raise ValueError(msg) elif rotconv in mconst._AXES2TUPLE.keys(): if nparray.shape[1] != 6: msg = 'nparray should have size Nx6 and not {}' msg = msg.format(nparray.shape) raise ValueError(msg) columns = self.__build_columns(rotconv) super().__init__(index=indeces, columns=columns) self.__rotconv = rotconv # Position self.x = nparray[:, 0] self.y = nparray[:, 1] self.z = nparray[:, 2] # Orientation if self.__rotconv == 'quaternion': self.q_0 = nparray[:, 3] self.q_1 = nparray[:, 4] self.q_2 = nparray[:, 5] self.q_3 = nparray[:, 6] else: self.alpha_0 = nparray[:, 3] self.alpha_1 = nparray[:, 4] self.alpha_2 = nparray[:, 5] return self
""" Assign trajectory from a dataframe """ raise ValueError('More than one rotconv detected') elif isinstance(df.columns, pd.MultiIndex): if 'location' in df.columns.levels[0]: rotconv = df.columns.levels[0].drop('location') if len(rotconv) == 1: rotconv = rotconv[0] else: msg = 'Could not determine rotconv from columns header' msg += '\n{}'.format(df.columns) raise ValueError(msg) else: msg = 'Could not determine rotconv from columns header' msg += '\n{}'.format(df.columns) raise ValueError(msg)
elif rotconv is None: msg = 'When dataframe does not contains rotconv_id,' msg += 'a convention should be given' raise ValueError(msg)
# Position self.x = df.loc[:, ('location', 'x')] self.y = df.loc[:, ('location', 'y')] self.z = df.loc[:, ('location', 'z')] else: # Orientation if isinstance(df.columns, pd.MultiIndex): self.q_0 = df.loc[:, (rotconv, 'q_0')] self.q_1 = df.loc[:, (rotconv, 'q_1')] self.q_2 = df.loc[:, (rotconv, 'q_2')] self.q_3 = df.loc[:, (rotconv, 'q_3')] else: self.q_0 = df.q_0 self.q_1 = df.q_1 self.q_2 = df.q_2 self.q_3 = df.q_3 else: elif 'alpha_0' in df.columns: self.alpha_0 = df.alpha_0 elif isinstance(df.columns, pd.MultiIndex): if 'q_0' in df.columns.levels[1]: self.alpha_0 = df.loc[:, (rotconv, 'q_0')] elif 'alpha_0' in df.columns.levels[1]: self.alpha_0 = df.loc[:, (rotconv, 'alpha_0')] else: msg = 'df should contains q_0 or alpha_0' msg += 'columns are:\n{}'.format(df.columns) raise KeyError(msg) else: raise KeyError('df should contains q_0 or alpha_0')
elif 'alpha_1' in df.columns: self.alpha_1 = df.alpha_1 elif isinstance(df.columns, pd.MultiIndex): if 'q_1' in df.columns.levels[1]: self.alpha_1 = df.loc[:, (rotconv, 'q_1')] elif 'alpha_1' in df.columns.levels[1]: self.alpha_1 = df.loc[:, (rotconv, 'alpha_1')] else: msg = 'df should contains q_1 or alpha_1' msg += 'columns are:\n{}'.format(df.columns) raise KeyError(msg) else: raise KeyError('df should contains q_1 or alpha_1')
elif 'alpha_2' in df.columns: self.alpha_2 = df.alpha_2 elif isinstance(df.columns, pd.MultiIndex): if 'q_2' in df.columns.levels[1]: self.alpha_2 = df.loc[:, (rotconv, 'q_2')] elif 'alpha_2' in df.columns.levels[1]: self.alpha_2 = df.loc[:, (rotconv, 'alpha_2')] else: msg = 'df should contains q_2 or alpha_2' msg += 'columns are:\n{}'.format(df.columns) raise KeyError(msg) else: raise KeyError('df should contains q_2 or alpha_2')
error=None, markers_labels=[0, 1, 2]): # Reinit the pandas dataframe super class # because we now know the indeces # If error is provided, we can propagate the error self.trajectory_error = pd.DataFrame(data=np.nan, index=self.index, columns=self.columns) 'euler_axes': self.rotation_mode} # Assign mark to pos # Calculate position and orientation # propagate error euclidian_error = error.loc[index_i] if not np.isnan(euclidian_error): covar = euclidian_error * np.eye(9) err_pos = propagate_error(_markers2position, x, covar, args=kwargs, epsilon=euclidian_error / 10)
err_angle = propagate_error(_markers2angles, x, covar, args=kwargs, epsilon=euclidian_error / 10) self.trajectory_error.loc[index_i, 'location'] = \ np.diagonal(err_pos) self.trajectory_error.loc[index_i, self.rotation_mode] \ = np.diagonal(err_angle)
# ----------------------------------------------- # ---------------- TRANSFORM -------------------- # ----------------------------------------------- """ Transform markers in body coordinate to world coordinate """ msg = 'markers should be of type pd.Series and not' msg += ' {}'.format(type(markers)) raise TypeError(msg) msg = 'markers should have a multiIndex index \n' msg += ' (i,"x"), (i,"y"),(i,"z")\n' msg += 'here i is the index of the marker' raise TypeError(msg) # Looping through each time point along the trajectory # More than one marker may be transformed # The marker are assume to be a multiIndex dataframe # Make sure that columns are correctly ordered # Transpose because we apply homogeneous transformation # on the marker, and thus a 4x4 matrix on a 4xN matrix # here N is the number of markers # Looping throught the indeces # to get the homogeneous transformation from the position orientation # and then apply the transformed to the marker position partial(_markerstransform, trajectory=self, homogeneous_markers=homogeneous_markers, rotation_mode=self.rotation_mode), indeces) # unwrap results index=indeces, columns=markers.index, dtype=float)
""" Transform markers in world coordinate to body coordinate """ msg = 'Prior to 12/09/2018:\n' msg += 'world2body was doing a reverse transformed\n' msg += 'Please use body2world instead' warnings.warn(msg) if not isinstance(markers, pd.Series): msg = 'markers should be of type pd.Series and not' msg += ' {}'.format(type(markers)) raise TypeError(msg) if not isinstance(markers.index, pd.MultiIndex): msg = 'markers should have a multiIndex index \n' msg += ' (i,"x"), (i,"y"),(i,"z")\n' msg += 'here i is the index of the marker' raise TypeError(msg) if indeces is None: # Looping through each time point along the trajectory indeces = self.index # More than one marker may be transformed # The marker are assume to be a multiIndex dataframe homogeneous_markers = markers.unstack() homogeneous_markers['w'] = 1 # Make sure that columns are correctly ordered homogeneous_markers = homogeneous_markers[['x', 'y', 'z', 'w']] # Transpose because we apply homogeneous transformation # on the marker, and thus a 4x4 matrix on a 4xN matrix # here N is the number of markers homogeneous_markers = homogeneous_markers.transpose() # Looping throught the indeces # to get the homogeneous transformation from the position orientation # and then apply the transformed to the marker position with Pool() as p: result = p.map( partial(_invmarkerstransform, trajectory=self, homogeneous_markers=homogeneous_markers, rotation_mode=self.rotation_mode), indeces) # unwrap results indeces = [res.name for res in result] transformed_markers = pd.DataFrame(data=result, index=indeces, columns=markers.index, dtype=float) return transformed_markers
"""differentiate the trajectory and rename columns as d+col
:param periods: periods as in pd.diff() :returns: Diff of the trajectory :rtype: pd.DataFrame with MultiIndex
""" mytrajdiff = self.diff(periods=1) d = dict(zip(mytrajdiff.columns.levels[1], ['d' + col for col in mytrajdiff.columns.levels[1]])) mytrajdiff = mytrajdiff.rename(columns=d, level=1) mytrajdiff.dropna().head() return mytrajdiff
""" Calculate the velocity on a trajectory """ velocity = pd.DataFrame(index=self.index, columns=['dx', 'dy', 'dz', 'dalpha_0', 'dalpha_1', 'dalpha_2'], dtype=float) diffrow = self.diff() velocity.loc[:, ['dx', 'dy', 'dz']] = diffrow.loc[:, 'location'].values for index_i, row in self.iterrows(): if self.rotation_mode == 'quaternion': raise NameError('Not implemented') else: rot = hte.angular_velocity( ai=row.loc[(self.rotation_mode, 'alpha_0')], aj=row.loc[(self.rotation_mode, 'alpha_1')], ak=row.loc[(self.rotation_mode, 'alpha_2')], dai=diffrow.loc[index_i, (self.rotation_mode, 'alpha_0')], daj=diffrow.loc[index_i, (self.rotation_mode, 'alpha_1')], dak=diffrow.loc[index_i, (self.rotation_mode, 'alpha_2')], axes=self.rotation_mode) velocity.loc[index_i, ['dalpha_0', 'dalpha_1', 'dalpha_2']] = rot.squeeze() return velocity
# -------------------------------------------- # ---------------- FILTER -------------------- # -------------------------------------------- """ Filter the trajectory with order and cutoff by using a lowpass filter twice (forward and backward) to correct for phase shift
:param order: the order of the lowpass filter. Either a number \ or a pandas series. The series should be multiindexed as the columns of \ the trajectory. :param cutoff: cut off frequency in Hz if sampling rate is known\ otherwise relative to the Nyquist frequency. Either a number or a pandas \ series. """ if self.sampling_rate > 0: nyquist = self.__sampling_rate / 2 cutoff /= nyquist if isinstance(order, (int, float)): order = pd.Series(data=order, index=self.columns) if isinstance(cutoff, (int, float)): cutoff = pd.Series(data=cutoff, index=self.columns) subtraj = self.consecutive_blocks() for trajno_nan in subtraj: indeces = trajno_nan.index for col in self.columns: b, a = signal.butter(order.loc[col], cutoff.loc[col]) if padlen is None: padlen = 3 * max(len(a), len(b)) if trajno_nan.shape[0] <= padlen: self.loc[indeces, col] *= np.nan else: if col[0] == 'location': self.loc[indeces, col] = signal.filtfilt( b, a, trajno_nan.loc[:, col], padlen=padlen).astype(float) else: self.loc[indeces, col] = signal.filtfilt( b, a, np.unwrap(trajno_nan.loc[:, col]), padlen=padlen).astype(float)
""" fillna with a given method """ customs_method = ['Cubic'] if not (method in customs_method): # fall back to pandas fillna function return self.fillna(method) # Start implementing customs_method if method == 'Cubic': for col in self.loc[:, 'location'].columns: values = self.loc[:, ('location', col)] validtime = values.dropna().index validvalues = values.dropna().values cs = CubicSpline(validtime, validvalues) time = self.index self.loc[:, ('location', col)] = cs(time) # for the angles we first do a ffill and then # unwrap and interpolate on the unwrap angles rotconv = self.rotation_mode for col in self.loc[:, rotconv].columns: values = self.loc[:, (rotconv, col)] validtime = values.dropna().index unwrapvalues = np.unwrap(values.fillna(method='ffill')) validvalues = unwrapvalues[validtime] cs = CubicSpline(validtime, validvalues) time = self.index self.loc[:, (rotconv, col)] = cs(time) return self
else: msg = 'Method {} is not supported.' msg += 'please use method supported by pd.fillna' msg += ' or one of the following methods {}' msg = msg.format(method, customs_method) raise NameError(msg)
# -------------------------------------------- # ---------------- EXTRACT ------------------- # --------------------------------------------
""" Return a list of subtrajectory withtout nans """ # get a numpy array from the trajectory, # because we are using numpy arrays later np_traj = self.values np_traj = np.hstack([self.index[:, np.newaxis], np_traj]) # Look for row containing at least one nan nonans = np.any(np.isnan(np_traj), axis=1) # spliting the trajectory according to nan location events = np.split(np_traj, np.where(nonans)[0])
# removing NaN entries events = [ev[~np.any(np.isnan(ev), axis=1)] for ev in events if isinstance(ev, np.ndarray)] # removing empty DataFrames subtraj = [Trajectory().from_dataframe(self.loc[ev[:, 0]]) for ev in events if ev.size > 0] return subtraj # ------------------------------------------- # ---------------- PLOTS -------------------- # -------------------------------------------
colors=None, step_lollipop=1, offset_lollipop=0, lollipop_marker='o', linewidth=1, lollipop_tail_width=1, lollipop_tail_length=1, lollipop_head_size=1, stickdir='backward', plotcoords=['x', 'y', 'z'] ): """ lollipops plot
create a lollipop plot for the trajectory with its associated \ direction. Handels missing frames by leaving gaps in the lollipop plot. However indices of the colors, trajectory
**Note** Gap in the index of the trajectory dataframe, will \ lead to gap in the plotted trajectory and the color bar will \ therefore miss some values (otherwise the color-coded frames \ will not be linear)
:param ax: is a matplotlib axes with 3d projection :param trajectory: is a pandas dataframe with columns \ ['x','y','z', 'euler_0','euler_1','euler_2'] :param euler_axes: the axes rotation convention \ (see homogenous.transformations for details) :param colors: is a pandas dataframe with columns \ ['r','g','b','a'] and indexed as trajectory. \ (default: time is color coded) :param step_lollipop: number of frames between two lollipops :param offset_lollipop: the first lollipop to be plotted :param lollipop_marker: the head of the lollipop :param linewidth: The width of the line connecting lollipops :param lollipop_tail_width: The width of the lollipop stick :param lollipop_tail_length: The length of the lollipop stick :param lollipop_head_size: The size of the lollipop :param stickdir: The direction of the stick of the animal \ (backward or forward) :param plotcoords: the dimension to plots, e.g. ['x','y','z'] for 3d plots \ ['x','y'] for a 2d plot """ # import time t_start = time.time() if ax is None: if len(plotcoords) == 3: fig = plt.figure() ax = fig.add_subplot(111, projection='3d') elif len(plotcoords) == 2: fig = plt.figure() ax = fig.add_subplot(111) if (len(plotcoords) != 2) and (len(plotcoords) != 3): msg = 'plotcoords need to contains 2 or 3 elements' msg += ' for 2d and 3d plots respectively' raise ValueError(msg) if ax.name == '3d': plotcoords = ['x', 'y', 'z'] elif len(plotcoords) > 2: plotcoords = plotcoords[:2]
# Start computing for direction direction = self.facing_direction() if colors is None: timeseries = pd.Series(data=self.index, index=self.index) colors, sm = get_color_dataframe(timeseries) # Create a continuous index from trajectory frames = np.arange(self.index.min(), self.index.max() + 1) # Select indeces to save time indeces = frames[offset_lollipop::step_lollipop] # Calculate agent tail direction # use function in trajectory to get any point bodyref to worldref tailmarker = pd.Series(data=0, index=pd.MultiIndex.from_product( [[0], ['x', 'y', 'z']])) if stickdir == 'forward': tailmarker.loc[0, 'x'] = lollipop_tail_length else: tailmarker.loc[0, 'x'] = -lollipop_tail_length tail = self.body2world(tailmarker, indeces=indeces) tail = tail.loc[:, 0] # Plot the agent trajectory # - loop through consecutive point # - Two consecutive point are associated with the color # of the first point x = self.loc[:, ('location', 'x')] y = self.loc[:, ('location', 'y')] z = self.loc[:, ('location', 'z')] print(time.time() - t_start) t_start = time.time() line = dict() line['x'] = self.x line['y'] = self.y line['z'] = self.z if isinstance(colors, pd.DataFrame): # Each segment will be plotted with a different color # we therefore need to loop through all points # in the trajectory, a rather long process for frame_i, frame_j in zip(frames[:-1], frames[1:]): # Frames may be missing in trajectory, # and therefore can not be plotted if (frame_i in self.index) and \ (frame_j in self.index) and \ (frame_i in self.index): color = [colors.r[frame_i], colors.g[frame_i], colors.b[frame_i], colors.a[frame_i]] # Create the line to plot line['x'] = [x[frame_i], x[frame_j]] line['y'] = [y[frame_i], y[frame_j]] line['z'] = [z[frame_i], z[frame_j]] # Actual plot command if len(plotcoords) == 3: ax.plot(xs=line['x'], ys=line['y'], zs=line['z'], color=color, linewidth=linewidth) else: # len(plotcoords) == 2 because check earlier ax.plot(line[plotcoords[0]], line[plotcoords[1]], color=color, linewidth=linewidth)
else: # Actual plot command if len(plotcoords) == 3: ax.plot(xs=line['x'], ys=line['y'], zs=line['z'], color=colors, linewidth=linewidth) else: # len(plotcoords) == 2 because check earlier ax.plot(line[plotcoords[0]], line[plotcoords[1]], color=colors, linewidth=linewidth) print(time.time() - t_start) t_start = time.time() # Plot the lollipop # - loop through the frames with a step of step_lollipop # - The lollipop is colored with the color of this frame # - Each lollipop is composed of a marker, # a point on the agent trajectory # and a line representing the body (anti facing direction) for frame_i in indeces: # Frames may be missing in trajectory, # and therefore can not be plotted if (frame_i in self.index) and \ (frame_i in direction.index): if isinstance(colors, pd.DataFrame): if frame_i not in colors.index: continue color = [colors.r[frame_i], colors.g[frame_i], colors.b[frame_i], colors.a[frame_i]] else: color = colors # Create the line to plot line['x'] = [self.x[frame_i], tail.x[frame_i]] line['y'] = [self.y[frame_i], tail.y[frame_i]] line['z'] = [self.z[frame_i], tail.z[frame_i]] # Actual plot command if len(plotcoords) == 3: ax.plot(xs=line['x'], ys=line['y'], zs=line['z'], color=color, linewidth=lollipop_tail_width) ax.plot(xs=[line['x'][0]], ys=[line['y'][0]], zs=[line['z'][0]], color=color, marker=lollipop_marker, markersize=lollipop_head_size) else: # len(plotcoords) == 2 because check earlier ax.plot(line[plotcoords[0]], line[plotcoords[1]], color=color, linewidth=lollipop_tail_width) ax.plot([line[plotcoords[0]][0]], [line[plotcoords[1]][0]], color=color, marker=lollipop_marker, markersize=lollipop_head_size) print(time.time() - t_start) |