diff --git a/navipy/markers/__init__.py b/navipy/markers/__init__.py deleted file mode 100644 index 5bb534f795ae0f566ba9f57c31944d8c6f45284c..0000000000000000000000000000000000000000 --- a/navipy/markers/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# package diff --git a/navipy/tools/tools.py b/navipy/tools/tools.py new file mode 100644 index 0000000000000000000000000000000000000000..3636b8828e7bfc14a354ae960b6b0d7a9c567a6f --- /dev/null +++ b/navipy/tools/tools.py @@ -0,0 +1,68 @@ +import pandas as pd +import numpy as np + + +def extract_block(data, thresholds): + """ + data is a pandas series with integer consecutive indexes + + return a dataframe, index by block containing block start and end + a block is defined as: + - data are greater than threshold_1 + - extend to the last value of data below threshold_2 \ +before the block + - extend to the first value of data below threshold_2 \ +after the block + + threshold_1 should be higher than threshold_2 + """ + threshold_2 = min(thresholds) + threshold_1 = max(thresholds) + + # create a dataframe containing: + # in column th_1: 1 for data above th_1, nan other wise + # in column th_2: 1 for data below th_2, nan other wise + treshold_df = pd.DataFrame(index=data.index, + columns=['th_1', 'th_2']) + treshold_df.th_1[data > threshold_1] = 1 + treshold_df.th_2[data <= threshold_2] = 1 + treshold_df['frame'] = treshold_df.index + # Calculate unextended block + subdf = treshold_df[['th_1', 'frame']].dropna(how='any') + start_block = subdf.frame[(subdf.frame.diff() > 1) + | (subdf.frame.diff().isnull())] + end_block = subdf.frame[(subdf.frame[::-1].diff() < -1) + | (subdf.frame[::-1].diff().isnull())] + + block_df = pd.DataFrame({'start_th1': start_block.reset_index().frame, + 'end_th1': end_block.reset_index().frame}) + # extend block based on th_2 + block_df['end_th2'] = np.nan + block_df['start_th2'] = np.nan + for i, row in block_df.iterrows(): + point_below_th2 = treshold_df.loc[:row.start_th1, + 'th_2'].dropna() + # check if a point is below th2 + if len(point_below_th2) > 0: + start_th2 = point_below_th2.index[-1] + else: + start_th2 = row.start_th1 + # Check if the point is before th1 + if start_th2 <= row.start_th1: + block_df.loc[i, 'start_th2'] = start_th2 + + point_below_th2 = treshold_df.loc[row.start_th1:, 'th_2'].dropna() + # check if a point is below th2 + if len(point_below_th2) > 0: + end_th2 = point_below_th2.index[0] + else: + end_th2 = row.end_th1 + # Check if the point is after th1 + if end_th2 >= row.end_th1: + block_df.loc[i, 'end_th2'] = end_th2 + + return block_df + + +def extract_block_nonans(data): + return extract_block(data.isnull() == 0, thresholds=[0.4, 0.5]) diff --git a/navipy/trajectories/__init__.py b/navipy/trajectories/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..c9851b3938a5f8a169a63fa4359f0fcc53307fd2 --- /dev/null +++ b/navipy/trajectories/__init__.py @@ -0,0 +1,423 @@ +""" + Trajectory in navipy +""" +import pandas as pd +import numpy as np +import navipy.maths.constants as mconst +import navipy.maths.quaternion as htq +import navipy.maths.euler as hte +from navipy.errorprop import propagate_error +import navipy.trajectories.transformations as mtf + + +def _markers2position(x, kwargs): + mark0 = pd.Series(x[:3], index=['x', 'y', 'z']) + mark1 = pd.Series(x[3:6], index=['x', 'y', 'z']) + mark2 = pd.Series(x[6:], index=['x', 'y', 'z']) + triangle_mode = kwargs['triangle_mode'] + euler_axes = kwargs['euler_axes'] + return mtf.markers2translate(mark0, mark1, mark2, + triangle_mode, euler_axes) + + +def _markers2angles(x, kwargs): + mark0 = pd.Series(x[:3], index=['x', 'y', 'z']) + mark1 = pd.Series(x[3:6], index=['x', 'y', 'z']) + mark2 = pd.Series(x[6:], index=['x', 'y', 'z']) + triangle_mode = kwargs['triangle_mode'] + euler_axes = kwargs['euler_axes'] + return mtf.markers2euler(mark0, mark1, mark2, + triangle_mode, euler_axes) + + +class Trajectory(pd.DataFrame): + def __init__(self, rotconv='rzyx', indeces=np.arange(1)): + columns = self.__build_columns(rotconv) + super().__init__(index=indeces, columns=columns) + self.__rotconv = rotconv + self.__sampling_rate = np.nan + + def __build_columns(self, rotconv): + if rotconv == 'quaternion': + index = pd.MultiIndex.from_tuples( + [('location', 'x'), ('location', 'y'), + ('location', 'z'), (rotconv, 'q_0'), + (rotconv, 'q_1'), (rotconv, 'q_2'), + (rotconv, 'q_3')]) + elif rotconv in mconst._AXES2TUPLE.keys(): + index = pd.MultiIndex.from_tuples( + [('location', 'x'), ('location', 'y'), + ('location', 'z'), (rotconv, 'alpha_0'), + (rotconv, 'alpha_1'), (rotconv, 'alpha_2')]) + else: + msg = 'convention for rotation {} is not suppored\n' + msg += msg.format(rotconv) + msg += 'the following convention are supported\n:' + for rconv in mconst._AXES2TUPLE.keys(): + msg += '{}\n'.format(rconv) + msg += 'quaternion\n' + raise KeyError(msg) + return index + + @property + def rotation_mode(self): + return self.__rotconv + + @rotation_mode.setter + def rotation_mode(self, rotation_mode): + oldrotmod = self.rotation_mode + neworient = pd.DataFrame(index=self.index, + columns=self.__build_columns(rotation_mode), + dtype=float) + neworient.drop(inpalce=True, labels='location', level=0, axis=1) + for index_i, row in self.iterrows(): + if rotation_mode == 'quaternion': + orient = htq.from_euler(row.angle_0, + row.angle_1, + row.angle_2, + axes=oldrotmod) + else: + m = hte.matrix(ai=row.loc[(self.rotation_mode, 'angle_0')], + aj=row.loc[(self.rotation_mode, 'angle_1')], + ak=row.loc[(self.rotation_mode, 'angle_2')], + axes=oldrotmod) + orient = hte.from_matrix(m, axes=rotation_mode) + neworient.loc[index_i, rotation_mode] = orient + self.drop(inplace=True, labels=oldrotmod, level=0, axis=1) + for col in neworient.columns: + self[(rotation_mode, col)] = neworient.loc[:, col] + self.__rotconv = rotation_mode + + def facing_direction(self): + """ + Return facing vector + """ + facing = pd.DataFrame(index=self.index, + columns=['x', 'y', 'z'], + dtype=float) + for i, row in self.iterrows(): + if self.rotation_mode == 'quaternion': + mat = htq.matrix(row.loc[self.rotation_mode].values) + else: + mat = hte.matrix(row.loc[(self.rotation_mode, 'alpha_0')], + row.loc[(self.rotation_mode, 'alpha_1')], + row.loc[(self.rotation_mode, 'alpha_2')], + axes=self.rotation_mode)[:3, :3] + orient = np.dot(mat, + np.array([[1], [0], [0]]))[:, 0] + facing.loc[i, ['x', 'y', 'z']] = orient + return facing + + @property + def sampling_rate(self): + if np.isnan(self.__sampling_rate): + raise NameError('Sampling rate has not be set') + else: + return self.index.name + + @sampling_rate.setter + def sampling_rate(self, sampling_rate): + self.__sampling_rate = sampling_rate + + @property + def x(self): + return self.loc[:, ('location', 'x')] + + @x.setter + def x(self, x): + self.loc[:, ('location', 'x')] = x + + @property + def y(self): + return self.loc[:, ('location', 'y')] + + @y.setter + def y(self, y): + self.loc[:, ('location', 'y')] = y + + @property + def z(self): + return self.loc[:, ('location', 'z')] + + @z.setter + def z(self, z): + self.loc[:, ('location', 'z')] = z + + def __get_alpha_i(self, alphai): + if self.__rotconv != 'quaternion': + return self.loc[:, (self.__rotconv, 'alpha_{}'.format(alphai))] + else: + msg = 'alpha_{0:} does not exist for quaternion (try q_{0:})' + raise ValueError(msg.format(alphai)) + + def __set_alpha_i(self, alphai, val): + if self.__rotconv != 'quaternion': + self.loc[:, (self.__rotconv, 'alpha_{}'.format(alphai))] = val + else: + msg = 'alpha_{0:} does not exist for quaternion (try q_{0:})' + raise ValueError(msg.format(alphai)) + + @property + def alpha_0(self): + return self.__get_alpha_i(0) + + @alpha_0.setter + def alpha_0(self, alpha_0): + self.__set_alpha_i(0, alpha_0) + + @property + def alpha_1(self): + return self.__get_alpha_i(1) + + @alpha_1.setter + def alpha_1(self, alpha_1): + self.__set_alpha_i(1, alpha_1) + + @property + def alpha_2(self): + return self.__get_alpha_i(2) + + @alpha_2.setter + def alpha_2(self, alpha_2): + self.__set_alpha_i(2, alpha_2) + + def __get_q_i(self, qi): + if self.__rotconv == 'quaternion': + return self.loc[:, (self.__rotconv, 'q_{}'.format(qi))] + else: + msg = 'q_{0:} does not exist for none quaternion (try alpha_{0:})' + raise ValueError(msg.format(qi)) + + def __set_q_i(self, qi, val): + if self.__rotconv != 'quaternion': + self.loc[:, (self.__rotconv, 'q_{}'.format(qi))] = val + else: + msg = 'q_{0:} does not exist for none quaternion (try alpha_{0:})' + raise ValueError(msg.format(qi)) + + @property + def q_0(self): + return self.__get_q_i(0) + + @q_0.setter + def q_0(self, q_0): + self.__set_q_i(0, q_0) + + @property + def q_1(self): + return self.__get_q_i(1) + + @q_1.setter + def q_1(self, q_1): + self.__set_q_i(1, q_1) + + @property + def q_2(self): + return self.__get_q_i(2) + + @q_2.setter + def q_2(self, q_2): + self.__set_q_i(2, q_2) + + @property + def q_3(self): + return self.__get_q_i(3) + + @q_3.setter + def q_3(self, q_3): + self.__set_q_i(3, q_3) + + def read_csv(self, filename, sep=',', header=[0, 1], index_col=0): + """ Load from a hdf file + """ + df = pd.read_csv(filename, sep=sep, + header=[0, 1], index_col=0) + self.from_dataframe(df) + return self + + def read_hdf(self, filename): + raise NameError('Not implemented') + + def from_array(self, nparray, rotconv): + """ Assign trajectory from a numpy array + N x 6 (rotconv = Euler angles) + N x 7 (rotconv = quaternion) + """ + # Check user input + if not isinstance(nparray, np.ndarray): + msg = 'nparray should be a np.ndarray and not {}' + msg = msg.format(type(nparray)) + raise TypeError(msg) + indeces = np.arange(0, nparray.shape[0]) + if rotconv == 'quaternion': + if nparray.shape[1] != 7: + msg = 'nparray should have size Nx7 and not {}' + msg = msg.format(nparray.shape) + raise ValueError(msg) + elif rotconv in mconst._AXES2TUPLE.keys(): + if nparray.shape[1] != 6: + msg = 'nparray should have size Nx6 and not {}' + msg = msg.format(nparray.shape) + raise ValueError(msg) + columns = self.__build_columns(rotconv) + super().__init__(index=indeces, columns=columns) + self.__rotconv = rotconv + # Position + self.x = nparray[:, 0] + self.y = nparray[:, 1] + self.z = nparray[:, 2] + # Orientation + if self.__rotconv == 'quaternion': + self.q_0 = nparray[:, 3] + self.q_1 = nparray[:, 4] + self.q_2 = nparray[:, 5] + self.q_3 = nparray[:, 6] + else: + self.alpha_0 = nparray[:, 3] + self.alpha_1 = nparray[:, 4] + self.alpha_2 = nparray[:, 5] + return self + + def from_dataframe(self, df, rotconv=None): + """ Assign trajectory from a dataframe + """ + if 'rotconv_id' in df.columns: + rotconv = df.loc[:, 'rotconv_id'] + if not np.all(rotconv == rotconv.iloc[0]): + raise ValueError('More than one rotconv detected') + rotconv = rotconv.iloc[0] # They are all the same :) + elif isinstance(df.columns, pd.MultiIndex): + if 'location' in df.columns.levels[0]: + rotconv = df.columns.levels[0].drop('location') + if len(rotconv) == 1: + rotconv = rotconv[0] + else: + msg = 'Could not determine rotconv from columns header' + msg += '\n{}'.format(df.columns) + raise ValueError(msg) + else: + msg = 'Could not determine rotconv from columns header' + msg += '\n{}'.format(df.columns) + raise ValueError(msg) + + elif rotconv is None: + msg = 'When dataframe does not contains rotconv_id,' + msg += 'a convention should be given' + raise ValueError(msg) + + indeces = df.index + columns = self.__build_columns(rotconv) + super().__init__(index=indeces, columns=columns) + self.__rotconv = rotconv + # Position + if isinstance(df.columns, pd.MultiIndex): + self.x = df.loc[:, ('location', 'x')] + self.y = df.loc[:, ('location', 'y')] + self.z = df.loc[:, ('location', 'z')] + else: + self.x = df.x + self.y = df.y + self.z = df.z + # Orientation + if self.__rotconv == 'quaternion': + if isinstance(df.columns, pd.MultiIndex): + self.q_0 = df.loc[:, (rotconv, 'q_0')] + self.q_1 = df.loc[:, (rotconv, 'q_1')] + self.q_2 = df.loc[:, (rotconv, 'q_2')] + self.q_3 = df.loc[:, (rotconv, 'q_3')] + else: + self.q_0 = df.q_0 + self.q_1 = df.q_1 + self.q_2 = df.q_2 + self.q_3 = df.q_3 + else: + if 'q_0' in df.columns: + self.alpha_0 = df.q_0 + elif 'alpha_0' in df.columns: + self.alpha_0 = df.alpha_0 + elif isinstance(df.columns, pd.MultiIndex): + if 'q_0' in df.columns.levels[1]: + self.alpha_0 = df.loc[:, (rotconv, 'q_0')] + elif 'alpha_0' in df.columns.levels[1]: + self.alpha_0 = df.loc[:, (rotconv, 'alpha_0')] + else: + msg = 'df should contains q_0 or alpha_0' + msg += 'columns are:\n{}'.format(df.columns) + raise KeyError(msg) + else: + raise KeyError('df should contains q_0 or alpha_0') + + if 'q_1' in df.columns: + self.alpha_1 = df.q_1 + elif 'alpha_1' in df.columns: + self.alpha_1 = df.alpha_1 + elif isinstance(df.columns, pd.MultiIndex): + if 'q_1' in df.columns.levels[1]: + self.alpha_1 = df.loc[:, (rotconv, 'q_1')] + elif 'alpha_1' in df.columns.levels[1]: + self.alpha_1 = df.loc[:, (rotconv, 'alpha_1')] + else: + msg = 'df should contains q_1 or alpha_1' + msg += 'columns are:\n{}'.format(df.columns) + raise KeyError(msg) + else: + raise KeyError('df should contains q_1 or alpha_1') + + if 'q_2' in df.columns: + self.alpha_2 = df.q_2 + elif 'alpha_2' in df.columns: + self.alpha_2 = df.alpha_2 + elif isinstance(df.columns, pd.MultiIndex): + if 'q_2' in df.columns.levels[1]: + self.alpha_2 = df.loc[:, (rotconv, 'q_2')] + elif 'alpha_2' in df.columns.levels[1]: + self.alpha_2 = df.loc[:, (rotconv, 'alpha_2')] + else: + msg = 'df should contains q_2 or alpha_2' + msg += 'columns are:\n{}'.format(df.columns) + raise KeyError(msg) + else: + raise KeyError('df should contains q_2 or alpha_2') + return self + + def from_markers(self, markers, triangle_mode, error=None): + if error is not None: + self.trajectory_error = pd.DataFrame(data=np.nan, + index=self.index, + columns=self.columns) + mark0 = markers.loc[:, 0] + mark1 = markers.loc[:, 1] + mark2 = markers.loc[:, 2] + x = np.zeros(9) # 3points with x,y,z + kwargs = {'triangle_mode': triangle_mode, + 'euler_axes': self.rotation_mode} + for index_i in self.index: + # Assign mark to pos + x[0:3] = mark0.loc[index_i, ['x', 'y', 'z']].values + x[3:6] = mark1.loc[index_i, ['x', 'y', 'z']].values + x[6:] = mark2.loc[index_i, ['x', 'y', 'z']].values + # Calculate position and orientation + position = _markers2position(x, kwargs) + orientation = _markers2angles(x, kwargs) + # propagate error + if error is not None: + euclidian_error = error.loc[index_i] + if not np.isnan(euclidian_error): + covar = euclidian_error*np.eye(9) + err_pos = propagate_error(_markers2position, x, + covar, args=kwargs, + epsilon=euclidian_error/10) + + err_angle = propagate_error(_markers2angles, x, + covar, args=kwargs, + epsilon=euclidian_error/10) + self.trajectory_error.loc[index_i, 'location'] = \ + np.diagonal(err_pos) + self.trajectory_error.loc[index_i, self.rotation_mode] \ + = np.diagonal(err_angle) + + self.loc[index_i, 'location'] = position + self.loc[index_i, self.rotation_mode] = orientation + + def lollipops(self): + raise NameError('Not implemented') diff --git a/navipy/trajectories/plot.py b/navipy/trajectories/plot.py new file mode 100644 index 0000000000000000000000000000000000000000..bb96ca671c68cd4cf70ffc34322e6cf4f45f568f --- /dev/null +++ b/navipy/trajectories/plot.py @@ -0,0 +1,182 @@ +""" +""" +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +from tra3dpy.trajectory.transform import agent_facing_vector, world2body + + +def get_color_frame_dataframe(frame_range=[0, 100], cmap=plt.get_cmap('jet')): + """ + +Get a colorframe from a range + + +Return a pandas dataframe indexed by frame number,here colors are \ +calculated from a matplotlib cmap generates a pandas data frame \ +representing the data frames specified in frame_range, where all \ +frames with an index between the maximum and minimum number given \ +in frame_range is considered. This pandas data frame is then used \ +to call get_color_dataframe() + +calls get_color_dataframe() + +Arguments + + - Input: + - frame_range: (default: 0 to 100), 1 dimensional \ +array of integers + - cmap: (default: jet colormap) colormap to be used + + - Output: + - a color dataframe (pandas table) and the \ +corresponding color-map (scalar mappable) + """ + + frame_series = pd.Series(index=np.arange( + min(frame_range), max(frame_range)+1)) # +1 include the last frame + frame_series[:] = np.linspace(0, 1, frame_series.shape[0]) + return get_color_dataframe(frame_series, cmap=cmap) + + +def get_color_dataframe(series, cmap=plt.get_cmap('jet')): + """ +Get a color Frame from a series + +Return a color dataframe from a series. Each value in the series \ +table gets a corresponding rbga color value in the returned color \ +dataframe with the same index. + +>>> df_colors,sm = get_color_dataframe(series,cmap=plt.get_cmap('jet')) + +Arguments +--------- + +Input: + - series: pandas frame that contains indexed values; \ +indices do not need to be in order values in in series is used \ +to generate the color map +Output: + - df_colors: pandas data-frame that contains the colormap; \ +the color that corresponds to the value in series has the same \ +index as in series. + - sm: the actual color map + """ + + # The values in the serie need to be normalized between 0 and 1 first + # We do not want to affect the series + normalised_values = series.values.copy() + # substract offset from values, so the smalles one is zero + normalised_values -= normalised_values.min() + # all values are zero and have the same value + assert normalised_values.max() != 0, 'series.values are constant' + # actually normalize the values + normalised_values /= normalised_values.max() + colors = cmap(normalised_values) + # create the dataframe from color + df_colors = pd.DataFrame( + data=colors, index=series.index, columns=['r', 'g', 'b', 'a']) + + # Create data for colorbar + sm = plt.cm.ScalarMappable(cmap=cmap, + norm=plt.Normalize( + vmin=series.values.min(), + vmax=series.values.max())) + # fake up the array of the scalar mappable. Urgh... + sm._A = [] + + return df_colors, sm + + +def lollipops(ax, trajectory, + colors=None, step_lollipop=1, + offset_lollipop=0, lollipop_marker='o', + linewidth=1, lollipop_tail_width=1, + lollipop_head_size=1): + """ + create a lollipop plot for the trajectory with its associated \ +direction. Handels missing frames by leaving + gaps in the lollipop plot. However indices of the colors, trajectory + + **Note** Gap in the index of the trajectory dataframe, will \ +lead to gap in the plotted trajectory and the color bar will \ +therefore miss some values (otherwise the color-coded frames \ +will not be linear) + + :param ax: is a matplotlib axes with 3d projection + :param trajectory: is a pandas dataframe with columns \ +['x','y','z', 'euler_0','euler_1','euler_2'] + :param euler_axes: the axes rotation convention \ +(see homogenous.transformations for details) + :param colors: is a pandas dataframe with columns \ +['r','g','b','a'] and indexed as trajectory. (default: time is color coded) + :param step_lollipop: number of frames between two lollipops + :param offset_lollipop: the first lollipop to be plotted + :param lollipop_marker: the head of the lollipop + :param linewidth: + :param lollipop_tail_width: + :param lollipop_head_size: + + """ + direction = agent_facing_vector(trajectory) + if colors is None: + timeseries = pd.Series(data=trajectory.index, + index=trajectory.index) + colors, sm = get_color_dataframe(timeseries) + # Create a continuous index from colors + frames = np.arange(colors.index.min(), colors.index.max() + 1) + + # Calculate agent tail direction + # use function in trajectory to get any point bodyref to worldref + tailmarker = pd.Series(data=0, + index=pd.MultiIndex.from_product( + [[0], + ['x', 'y', 'z']])) + tailmarker.loc[0, 'x'] = 1 + tail = world2body(tailmarker, trajectory) + tail = tail.loc[:, 0] + # Plot the agent trajectory + # - loop through consecutive point + # - Two consecutive point are associated with the color of the first point + for frame_i, frame_j in zip(frames[:-1], frames[1:]): + # Frames may be missing in trajectory, and therefore can not be plotted + if (frame_i in trajectory.index) and \ + (frame_j in trajectory.index) and \ + (frame_i in colors.index): + color = [colors.r[frame_i], colors.g[frame_i], + colors.b[frame_i], colors.a[frame_i]] + # Create the line to plot + line_x = [trajectory.x[frame_i], trajectory.x[frame_j]] + line_y = [trajectory.y[frame_i], trajectory.y[frame_j]] + line_z = [trajectory.z[frame_i], trajectory.z[frame_j]] + # Actual plot command + ax.plot(xs=line_x, ys=line_y, zs=line_z, + color=color, linewidth=linewidth) + # Plot the lollipop + # - loop through the frames with a step of step_lollipop + # - The lollipop is colored with the color of this frame + # - Each lollipop is composed of a marker, a point on the agent trajectory + # and a line representing the body (anti facing direction) + for frame_i in frames[offset_lollipop::step_lollipop]: + # Frames may be missing in trajectory, and therefore can not be plotted + if (frame_i in trajectory.index) and \ + (frame_i in direction.index) and \ + (frame_i in colors.index): + color = [colors.r[frame_i], colors.g[frame_i], + colors.b[frame_i], colors.a[frame_i]] + # Create the line to plot + line_x = [trajectory.x[frame_i], + tail.x[frame_i]] + line_y = [trajectory.y[frame_i], + tail.y[frame_i]] + line_z = [trajectory.z[frame_i], + tail.z[frame_i]] + # Actual plot command + ax.plot(xs=line_x, ys=line_y, zs=line_z, + color=color, linewidth=lollipop_tail_width) + ax.plot(xs=[line_x[0]], + ys=[line_y[0]], + zs=[line_z[0]], + color=color, + marker=lollipop_marker, + markersize=lollipop_head_size) diff --git a/navipy/trajectories/random.py b/navipy/trajectories/random.py new file mode 100644 index 0000000000000000000000000000000000000000..cf1631e966314908f990b98e8e61238f08c09795 --- /dev/null +++ b/navipy/trajectories/random.py @@ -0,0 +1,219 @@ +""" +Generate a trajectory + +# An agent trajectory is defined by the position and orientation of \ +the agent along the time t. The agent is flying in the direction of \ +its orientation at a given speed + +The trajectory can then be used to test: + +* saccade extraction +* optic-flow calculation +* point of pivoting, ... + +For practical purposes, the trajectory will stored in a pandas \ +DataFrame, index by frame number (equivalent to time), and \ +having for columns: + +* 'x','y','z', representating the agent position +* 'alpha_0','alpha_1','alpha_2', the three euler angles +* 'facing_x','facing_x','facing_y', the facing direction of the agent + +""" + +from tra3dpy.maths.euler import matrix +from scipy.stats import norm +import numpy as np +import pandas as pd +from tra3dpy.trajectory import Trajectory + + +def yawpitchroll(yaw, pitch, roll): + return matrix(yaw, pitch, roll, axes='rzyx')[:3, :3] + + +def generate_trajectory(starting_point, speed, yaw, pitch, roll): + """ + generate a trajectory starting from the starting_point, and \ +flying at speed (plausibly changing over time), + and rotating in the world according to yaw,pitch,roll + + starting_point is 1x3 vector + speed is a Nx1 vector + yaw is a Nx1 vector + pitch is a Nx1 vector + roll is a Nx1 vector + + here N is the number of time point + """ + assert starting_point.shape[1] == 3, \ + 'starting_point should have a size of 1x3' + assert speed.shape[0] == yaw.shape[0], \ + 'speed and yaw should have the same number of point' + assert speed.shape[0] == pitch.shape[0], \ + 'speed and pitch should have the same number of point' + assert speed.shape[0] == roll.shape[0], \ + 'speed and roll should have the same number of point' + + trajectory = Trajectory(data=np.nan, + index=np.arange(speed.shape[0]), + rotation_mode='rzyx', + sampling_rate=100) + trajectory.loc[0, 'position'] = starting_point + for i in trajectory.index[1:]: + speed_orient = np.dot(yawpitchroll( + yaw[i-1], pitch[i-1], roll[i-1]), + np.array([[speed[i-1]], [0], [0]]))[:, 0] + next_point = trajectory.loc[i-1, 'position']+speed_orient + trajectory.loc[i, 'position'] = next_point + trajectory.loc[:, (trajectory.rotation_mode, 'angle_0')] = yaw + trajectory.loc[:, (trajectory.rotation_mode, 'angle_1')] = pitch + trajectory.loc[:, (trajectory.rotation_mode, 'angle_2')] = roll + return trajectory + + +def generate_saccade(sac_amplitude): + """ + return a list of angle, here the sum is the saccade amplitude + + % original cyberfly uses 8 templates derived from behavioural data. + width of the template: linear regression on the sacc templates used + in original cyberfly + + see Jens Lindemann 2005 + """ + # Calculate the saccade length + sac_len = np.abs(sac_amplitude)*28.6+23.4 + + # create a gaussian velocity profile. sigma=0.35 fits the original + # templates + gw = norm.pdf(np.linspace(-1, 1, sac_len), 0, 0.35) + # scale to the angular amplitude + saccade_seq = (gw/np.sum(gw))*sac_amplitude + + return saccade_seq + + +def saccadic_data(intersac_length_f=lambda n: 50+np.floor( + 10*np.random.rand(n)), + intersac_drifty_f=lambda n: ( + 3*np.pi/180)*(2*(np.random.rand(n)-0.5)), + intersac_driftp_f=lambda n: ( + 0*np.pi/180)*(2*(np.random.rand(n)-0.5)), + intersac_driftr_f=lambda n: ( + 0*np.pi/180)*(2*(np.random.rand(n)-0.5)), + sac_y_f=lambda n: ( + np.pi/2)*(2*(np.random.rand(n)-0.5)), + sac_p_f=lambda n: ( + np.pi/2)*(2*(np.random.rand(n)-0.5)), + sac_r_f=lambda n: ( + np.pi/2)*(2*(np.random.rand(n)-0.5)), + number_sac=10): + """ + generate a yaw pitch, roll from random saccadic input + + intersac_length_f a function of n (number of saccade), \ +being the length of the intersaccade + intersac_drifty_f a function of n, being the residual yaw-rotation + intersac_driftp_f a function of n, being the residual pitch-rotation + intersac_driftr_f a function of n, being the residual roll-rotation + + sac_y_f a function of n, being the yaw amplitude of the saccade + sac_p_f a function of n, being the pitch amplitude of the saccade + sac_r_f a function of n, being the rol amplitude of the saccade + """ + # create data frame sumarising saccadic data + saccade_df = pd.DataFrame(index=np.arange(number_sac), + columns=['intersac_length', + 'intersac_drifty_f', + 'intersac_driftp_f', + 'intersac_driftr_f', + 'sac_y_f', + 'sac_p_f', + 'sac_r_f']) + # populate intersaccadic length + saccade_df.intersac_length = intersac_length_f(number_sac) + # populate intersaccadic residual rotation + saccade_df.intersac_drifty_f = intersac_drifty_f(number_sac) + saccade_df.intersac_driftp_f = intersac_driftp_f(number_sac) + saccade_df.intersac_driftr_f = intersac_driftr_f(number_sac) + # populate saccade properties + saccade_df.sac_y_f = sac_y_f(number_sac) + saccade_df.sac_p_f = sac_p_f(number_sac) + saccade_df.sac_r_f = sac_r_f(number_sac) + + yaw = np.zeros(1) + pitch = np.zeros(1) + roll = np.zeros(1) + + for i, row in saccade_df.iterrows(): + # calculate intersaccadic residual rotation + yaw_inter = np.linspace(0, row.intersac_drifty_f, row.intersac_length) + pitch_inter = np.linspace( + 0, row.intersac_driftp_f, row.intersac_length) + roll_inter = np.linspace(0, row.intersac_driftr_f, row.intersac_length) + # update yaw,pitch,roll + yaw = np.hstack((yaw, yaw[-1] + yaw_inter)) + pitch = np.hstack((pitch, pitch[-1] + pitch_inter)) + roll = np.hstack((roll, roll[-1] + roll_inter)) + # calculate saccade + yaw_sac = generate_saccade(row.sac_y_f) + pitch_sac = generate_saccade(row.sac_p_f) + roll_sac = generate_saccade(row.sac_r_f) + # find longest + if (len(yaw_sac) >= len(pitch_sac)) and\ + (len(yaw_sac) >= len(roll_sac)): + # yaw is longest + pitch_sac = yaw_sac*np.sum(pitch_sac)/np.sum(yaw_sac) + roll_sac = yaw_sac*np.sum(roll_sac)/np.sum(yaw_sac) + elif (len(pitch_sac) >= len(yaw_sac)) and\ + (len(pitch_sac) >= len(roll_sac)): + # pitch is longest + yaw_sac = pitch_sac*np.sum(yaw_sac)/np.sum(pitch_sac) + roll_sac = pitch_sac*np.sum(roll_sac)/np.sum(pitch_sac) + elif (len(roll_sac) >= len(yaw_sac)) and\ + (len(roll_sac) >= len(pitch_sac)): + # roll is longest + yaw_sac = roll_sac*np.sum(yaw_sac)/np.sum(roll_sac) + pitch_sac = roll_sac*np.sum(pitch_sac)/np.sum(roll_sac) + else: + mgs = 'Error in saccade generation, ' + mgs += 'can not find the longest saccade' + raise NameError(mgs) + # update yaw,pitch,roll + yaw = np.hstack((yaw, yaw[-1]+np.cumsum(yaw_sac))) + pitch = np.hstack((pitch, pitch[-1]+np.cumsum(pitch_sac))) + roll = np.hstack((roll, roll[-1]+np.cumsum(roll_sac))) + + return yaw, pitch, roll, saccade_df + + +def saccadic_traj(intersac_length_f=lambda n: 50+np.floor( + 10*np.random.rand(n)), + intersac_drifty_f=lambda n: ( + 3*np.pi/180)*(2*(np.random.rand(n)-0.5)), + intersac_driftp_f=lambda n: ( + 0*np.pi/180)*(2*(np.random.rand(n)-0.5)), + intersac_driftr_f=lambda n: ( + 0*np.pi/180)*(2*(np.random.rand(n)-0.5)), + sac_y_f=lambda n: (np.pi/2) * + (2*(np.random.rand(n)-0.5)), + sac_p_f=lambda n: (np.pi/2) * + (2*(np.random.rand(n)-0.5)), + sac_r_f=lambda n: (np.pi/2) * + (2*(np.random.rand(n)-0.5)), + number_sac=10): + # generate fake saccadic data + yaw, pitch, roll, saccade_df = saccadic_data(intersac_length_f, + intersac_drifty_f, + intersac_driftp_f, + intersac_driftr_f, + sac_y_f, + sac_p_f, sac_r_f, + number_sac) + speed = 1+np.zeros(len(yaw)) + starting_point = np.array([[0, 0, 0]]) + # calculate trajectory + trajectory = generate_trajectory(starting_point, + speed, yaw, pitch, roll) + return trajectory, saccade_df diff --git a/navipy/markers/test_markers.py b/navipy/trajectories/test_markers.py similarity index 100% rename from navipy/markers/test_markers.py rename to navipy/trajectories/test_markers.py diff --git a/navipy/trajectories/test_trajectory.py b/navipy/trajectories/test_trajectory.py new file mode 100644 index 0000000000000000000000000000000000000000..755c012dfcc5b8cec8cf393226f3233fc272ba26 --- /dev/null +++ b/navipy/trajectories/test_trajectory.py @@ -0,0 +1,55 @@ +import unittest +import numpy as np +import pandas as pd +import tra3dpy.maths.constants as htconst +from tra3dpy.trajectory import Trajectory + + +class TestTrajectoryTransform(unittest.TestCase): + def test_forwardreverse(self): + # Build an equilateral triangle + markers = pd.Series(data=0, + index=pd.MultiIndex.from_product([ + [0, 1, 2], + ['x', 'y', 'z']])) + markers.loc[(0, 'x')] = -1 + markers.loc[(2, 'y')] = np.sin(np.pi / 3) + markers.loc[(1, 'y')] = -np.sin(np.pi / 3) + markers.loc[(1, 'x')] = np.cos(np.pi / 3) + markers.loc[(2, 'x')] = np.cos(np.pi / 3) + markers.index.name = None + # Create a random trajectory + trajectory = Trajectory(data=np.nan, + index=np.arange(10), + rotation_mode='sxyz', + sampling_rate=100) + traj_test = Trajectory(data=np.nan, + index=np.arange(10), + rotation_mode='sxyz', + sampling_rate=100) + trajectory.loc[:, 'position'] = \ + 100 * (np.random.rand(trajectory.shape[0], 3) - 0.5) + trajectory.loc[:, trajectory.rotation_mode] = \ + 2 * np.pi * (np.random.rand(trajectory.shape[0], 3) - 0.5) + col = (trajectory.rotation_mode, 'angle_1') + trajectory.loc[:, col] = trajectory.loc[:, col] / 2 + # forward + for euler_axes in list(htconst._AXES2TUPLE.keys()): + trajectory.rotation_mode = euler_axes + traj_test.rotation_mode = euler_axes + transformed_markers = trajectory.world2body(markers) + # reverse + for triangle_mode in ['x-axis=median-from-0', + 'y-axis=1-2']: + traj_test.from_markers(transformed_markers, + triangle_mode) + np.testing.assert_array_almost_equal( + trajectory.astype(float), + traj_test.astype(float)) + + def test_velocity(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/navipy/markers/test_triangle.py b/navipy/trajectories/test_triangle.py similarity index 100% rename from navipy/markers/test_triangle.py rename to navipy/trajectories/test_triangle.py diff --git a/navipy/markers/tools.py b/navipy/trajectories/tools.py similarity index 100% rename from navipy/markers/tools.py rename to navipy/trajectories/tools.py diff --git a/navipy/markers/transformations.py b/navipy/trajectories/transformations.py similarity index 100% rename from navipy/markers/transformations.py rename to navipy/trajectories/transformations.py diff --git a/navipy/markers/triangle.py b/navipy/trajectories/triangle.py similarity index 100% rename from navipy/markers/triangle.py rename to navipy/trajectories/triangle.py