Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

""" 

Tools for markers 

""" 

from scipy import signal 

from scipy.interpolate import interp1d 

import pandas as pd 

import numpy as np 

from navipy import tools as tratools 

import fastdtw 

from navipy.trajectories import Trajectory 

 

 

def averaged_trajectory(alltraj): 

""" Calculate an average trajectory 

 

Trajectories are aligned by using dynamic time wraping (dtw). \ 

The alignment is relative to the first trajectory in the list. Therefore \ 

the average trajectory will have the same length that the first trajectory. 

 

Aligned trajectory are aligned as follow. The mean of average position \ 

corresponding to the time of the first trajectory devided by the number of \ 

trajectory are summed over all trajectory. For the orientation, the circular \ 

mean is used according to `Statistics on Sphere` Geoffrey S. Watson 1983. 

 

:param alltraj: A list of trajectories 

:returns: An average trajectory 

""" 

rotconv = alltraj[0].rotation_mode 

traj_1 = alltraj[0].dropna().values 

avg_traj = traj_1.copy() 

avg_traj[:, 0:3] /= len(alltraj) 

for traj_2 in alltraj[1:]: 

traj_2 = traj_2.dropna().values 

x = np.array(traj_1[:, [0, 1, 2]], dtype='float') 

y = np.array(traj_2[:, [0, 1, 2]], dtype='float') 

test = fastdtw.fastdtw(x, y) 

 

# average 

for ii, group in pd.DataFrame(test[1]).groupby(0): 

# average position 

avg_traj[ii, 0:3] += np.mean(traj_2[group.loc[:, 1], 

0:3], axis=0) / len(alltraj) 

# average angles 

# see Statistics On Spheres", Geoffrey S. Watson, 

# University of Arkansas Lecture Notes 

# in the Mathematical Sciences, 1983 John Wiley & Son 

for kk in range(3, 6): 

sinsum = np.sum(np.sin(traj_2[group.loc[:, 1], kk]), axis=0) 

cossum = np.sum(np.cos(traj_2[group.loc[:, 1], kk]), axis=0) 

sinsum += np.sin(avg_traj[ii, kk]) 

cossum += np.cos(avg_traj[ii, kk]) 

avg_traj[ii, kk] = np.arctan2(sinsum, cossum) 

 

return Trajectory().from_array(avg_traj, rotconv=rotconv) 

 

 

def interpolate_markers(markers, kind='cubic'): 

""" 

Interpolate marker position where Nan are 

""" 

columns = markers.columns 

markers_inter = markers.copy() 

for col in columns: 

y = markers_inter.loc[:, col] 

valid_y = y.dropna() 

valid_t = valid_y.index 

nan_y = y.isnull().nonzero()[0] 

 

func = interp1d(valid_t, valid_y, kind='cubic') 

markers_inter.loc[nan_y, col] = func(nan_y) 

return markers_inter 

 

 

def filter_markers(markers, cutfreq, order): 

if cutfreq < order: 

raise ValueError( 

'cutoffrequency {} can not be lower than order {}'.format( 

cutfreq, order)) 

nyquist = markers.index.name / 2 

if cutfreq > nyquist: 

raise ValueError( 

'cutoffrequency {} can not be higher than Nyquist freq {}'.format( 

cutfreq, nyquist)) 

b, a = signal.butter(order, cutfreq / nyquist) 

 

columns = markers.columns 

markers_filt = markers.copy() 

for col in columns: 

markers_filt.loc[:, col] = signal.filtfilt(b, a, markers.loc[:, col]) 

return markers_filt 

 

 

def extract_sacintersac(trajectory, thresholds): 

""" 

add a intersaccade and saccade index the trajectory 

therefore intersaccade, and saccade columns will be added 

to the trajectory dataframe 

""" 

# assign 

saccade_intersaccde = pd.DataFrame(index=trajectory.index, 

columns=['intersaccade', 

'saccade', 

'smooth_angvel']) 

 

# calculate velocity 

velocity = trajectory.velocity() 

angvel = np.sqrt( 

np.sum(velocity.loc[:, ['dalpha_0', 

'dalpha_1', 

'dalpha_2']]**2, 

axis=1)) 

# find block of nonan 

block_nonandf = tratools.extract_block_nonans(angvel) 

block_nonandf = block_nonandf.astype(int) 

# print(block_nonandf) 

 

sac_number = 0 

intersac_number = 0 

for i, curr_blocknonan in block_nonandf.iterrows(): 

current_item = np.arange( 

curr_blocknonan.start_th1, curr_blocknonan.end_th1) 

if len(current_item) < 1: 

continue 

# extract block 

data = angvel[current_item].abs() 

block_df = tratools.extract_block(data, thresholds) 

 

sac_e_p = current_item[0] 

for _, row in block_df.iterrows(): 

sac_s = row.start_th2 

sac_e = row.end_th2 

# check boundary 

if sac_s < current_item[0]: 

sac_s = current_item[0] 

if sac_e > current_item[-1]: 

sac_e = current_item[-1] 

 

frame_s = np.arange(sac_s, sac_e).astype(int) 

frame_i = np.arange(sac_e_p, sac_s).astype(int) 

if len(frame_s) > 0: 

saccade_intersaccde.loc[frame_s, 

'saccade'] = sac_number 

sac_number += 1 

if len(frame_i) > 0: 

saccade_intersaccde.loc[frame_i, 

'intersaccade'] = intersac_number 

intersac_number += 1 

 

sac_e_p = sac_e 

 

frame_i = np.arange(sac_e_p, current_item[-1]).astype(int) 

if len(frame_i) > 0: 

saccade_intersaccde.loc[frame_i, 

'intersaccade'] = intersac_number 

intersac_number += 1 

 

return saccade_intersaccde, angvel