From 634c0c26d0566049d398a969c7870633d481fcb9 Mon Sep 17 00:00:00 2001 From: "Olivier J.N. Bertrand" <olivier.bertrand@uni-bielefeld.de> Date: Thu, 20 Sep 2018 13:47:35 +0200 Subject: [PATCH] split OF functions --- navipy/maths/coordinates.py | 14 +- navipy/processing/mcode.py | 417 ++++++++++------------------ navipy/processing/test_OpticFlow.py | 251 +++++------------ 3 files changed, 222 insertions(+), 460 deletions(-) diff --git a/navipy/maths/coordinates.py b/navipy/maths/coordinates.py index 2e22bb6..764ba55 100644 --- a/navipy/maths/coordinates.py +++ b/navipy/maths/coordinates.py @@ -3,8 +3,6 @@ """ import numpy as np from navipy.scene import is_numeric_array -from navipy.maths.homogeneous_transformations\ - import compose_matrix def cartesian_to_spherical(x, y, z): @@ -23,8 +21,7 @@ def spherical_to_cartesian(elevation, azimuth, radius=1): return x, y, z -def cartesian_to_spherical_vectors(cart_vec, angles, - viewing_direction, axes='zyx'): +def cartesian_to_spherical_vectors(cart_vec, viewing_direction): """Now we need the cartesian vector as a spherical vecotr. A vector in cartesian coordinates can be transform as one in the spherical coordinates following the transformation: @@ -42,8 +39,6 @@ def cartesian_to_spherical_vectors(cart_vec, angles, the orientation Yaw=pitch=roll=0""" if cart_vec is None: raise ValueError("cartesian vector must not be None") - if angles is None: - raise ValueError("angles must not be None") if viewing_direction is None: raise ValueError("viewing direction must not be None") if (not isinstance(cart_vec, np.ndarray)): @@ -62,10 +57,6 @@ def cartesian_to_spherical_vectors(cart_vec, angles, raise Exception("first dimension of viewing\ direction must be of size two") - M = compose_matrix(scale=None, shear=None, angles=angles, translate=None, - perspective=None, axes=axes)[:3, :3] - cart_vec = np.dot(np.transpose(M), cart_vec) - SPH_x = cart_vec[0] SPH_y = cart_vec[1] SPH_z = cart_vec[2] @@ -73,9 +64,6 @@ def cartesian_to_spherical_vectors(cart_vec, angles, epsilon = viewing_direction[1] phi = viewing_direction[0] - # print("OFTs", opticFlow) - # print("ep,phi", epsilon, phi) - rofterm1 = +SPH_x*np.cos(epsilon)*np.cos(phi) rofterm2 = +SPH_y*np.cos(epsilon)*np.sin(phi) rofterm3 = +SPH_z*np.sin(epsilon) diff --git a/navipy/processing/mcode.py b/navipy/processing/mcode.py index 2f57d16..68ee588 100644 --- a/navipy/processing/mcode.py +++ b/navipy/processing/mcode.py @@ -1,7 +1,7 @@ """ Motion code """ -from navipy.scene import check_scene +from navipy.scene import check_scene, is_ibpc from navipy.scene import __spherical_indeces__ from navipy.scene import is_numeric_array from navipy.maths.homogeneous_transformations\ @@ -14,184 +14,8 @@ from navipy.maths.euler import angular_velocity from navipy.maths.constants import _AXES2TUPLE -def spherical_to_Vector(sp): - """ translate 2D azimuth and elevation vector to 3D vector - :param sp: np array to be translated from spherical to euclidean - coordinates - :returns vector - :rtype np.ndarray """ - if sp is None: - raise ValueError("sp must not be None") - if not isinstance(sp, np.ndarray): - raise TypeError("vector must be list or array") - if len(sp) != 2: - raise ValueError("vector must contain two elements") - if (not isinstance(sp[0], float)) and (not isinstance(sp[0], int)): - raise ValueError("azimuth must be float or integer") - if (not isinstance(sp[1], float)) and (not isinstance(sp[1], int)): - raise ValueError("elevation must be float or integer") - vector = np.array([np.dot(np.cos(sp[1]), np.cos(sp[0])), - np.dot(np.cos(sp[1]), np.sin(sp[0])), - np.sin(sp[1])]) - - return vector - - -def OFSubroutineSpToVector(sphericCoord): - """ -OFSUBROUTINESPTOVECTOR recieves a pair of spheric coordinates and -converts them into a 3dim vector. - - :param sphericCoord - a Sx2 matrix with spheric coordinates - - :return vector a Sx3 matrix with the transformed coordinates - :rtype np.ndarray - -NOTE: this is NOT the normal spheic coordinate system! - spheric coordinates given as azimuth (Longitude) and - zenith (Latitude) given in geographic coordinate system) in radian - [pi/2 0]= [0 1 0] left, [3*pi/2 0]= [0 -1 0] right, - [0 pi/2]= [0 0 1] top, [0 -pi/2]= [0 0 -1] bottom - - - ################################################################# - Copyright (C) 2009-2014 J.P. Lindemann, C. Strub - - This file is part of the ivtools. - https://opensource.cit-ec.de/projects/ivtools - - the Optic-Flow toolbox is free software: you can redistribute it - and/or modify it under the terms of the GNU General Public License - as published by the Free Software Foundation, either version 3 of - the License, or (at your option) any later version. - - the Optic-Flow toolbox is distributed in the hope that it will be - useful, but WITHOUT ANY WARRANTY; without even the implied warranty - of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with Foobar. If not, see <http://www.gnu.org/licenses/>. - ################################################################## -""" - sp = sphericCoord - if sp is None: - raise ValueError("sp must not be None") - if not isinstance(sp, list): - raise TypeError("vector must be list or array") - if not isinstance(sp, np.ndarray) and not isinstance(sp, list): - raise TypeError("vector must be list or array") - if len(sp) != 2: - raise ValueError("vector must contain two elements") - if (not isinstance(sp[0], float)) and (not isinstance(sp[0], int)): - raise ValueError("azimuth must be float or integer") - if (not isinstance(sp[1], float)) and (not isinstance(sp[1], int)): - raise ValueError("elevation must be float or integer") - """ - vector = np.zeros((3,)) - vector = [np.dot(np.cos(sp[1]), np.cos(sp[0])), - np.dot(np.cos(sp[1]), np.sin(sp[0])), - np.sin(sp[1])] - """ - vector = np.array(spherical_to_cartesian(sp[1], sp[0])) - - return vector - - -def OFSubroutineRotate(vec, rotAx, alpha): - """ - OFSUBROUTINEROTATE rotates the given vector through the rotAxis as - UNIT-vector - - :param vec - Vx3 matrix with vectors to rotate - :param rotAx - the axis to rotate around - :param alpha - the angle to rotate in radian - - :return rotVec - the rotated vectors in the input matrix - :rtype np.ndarray - - - - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - % Copyright (C) 2009-2014 J.P. Lindemann, C. Strub - % - % This file is part of the ivtools. - % https://opensource.cit-ec.de/projects/ivtools - % - % the Optic-Flow toolbox is free software: you can redistribute it - % and/or modify it under the terms of the GNU General Public License - % as published by the Free Software Foundation, either version 3 of - % the License, or (at your option) any later version. - % - % the Optic-Flow toolbox is distributed in the hope that it will be - % useful, but WITHOUT ANY WARRANTY; without even the implied warranty - % of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - % GNU General Public License for more details. - % - % You should have received a copy of the GNU General Public License - % along with Foobar. If not, see <http://www.gnu.org/licenses/>. - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - """ - if vec is None: - raise ValueError("vector must not be None") - if rotAx is None: - raise ValueError("rotation axis must not be None") - if not isinstance(vec, list) and (not isinstance(vec, np.ndarray)): - raise TypeError("vector must be list or array") - if not isinstance(rotAx, np.ndarray) and (not isinstance(rotAx, list)): - raise TypeError("rotation Axis must be list or array") - if len(vec) != 3: - raise ValueError("vector must contain two elements") - if (not isinstance(rotAx[0], float)) and (not isinstance(rotAx[0], int)): - raise ValueError("rotation Axis must be float or integer") - if (not isinstance(rotAx[1], float)) and (not isinstance(rotAx[1], int)): - raise ValueError("rotation Axis must be float or integer") - if (not isinstance(rotAx[2], float)) and (not isinstance(rotAx[2], int)): - raise ValueError("rotation Axis must be float or integer") - if (not isinstance(alpha, float)) and (not isinstance(alpha, int)): - raise ValueError("rotation vector must be float or integer") - if (not isinstance(vec, np.ndarray)) and (not isinstance(vec, list)): - raise TypeError("vector must be of type np.ndarray or list") - if isinstance(vec, np.ndarray): - if vec.shape[0] != 3: - raise Exception("second dimension of vector must have size three") - if not is_numeric_array(vec): - raise TypeError("vec must be of numerical type") - # scaling the axis to unit vector - rotAx = rotAx / np.linalg.norm(rotAx) - - # this is an rotation matrix: - Rot = [[np.cos(alpha) + np.power(rotAx[0], 2) * (1 - np.cos(alpha)), - np.dot(rotAx[0], rotAx[1]) * - (1 - np.cos(alpha)) - rotAx[2] * np.sin(alpha), - np.dot(rotAx[0], rotAx[2]) * - (1-np.cos(alpha)) + rotAx[1] * np.sin(alpha)], - [np.dot(rotAx[1], rotAx[0]) * - (1-np.cos(alpha)) + rotAx[2] * np.sin(alpha), - np.cos(alpha)+np.power(rotAx[1], 2) * - (1-np.cos(alpha)), np.dot(rotAx[1], - rotAx[2]) * (1-np.cos(alpha))-rotAx[0] * np.sin(alpha)], - [np.dot(rotAx[2], rotAx[0]) * - (1-np.cos(alpha))-rotAx[1] * np.sin(alpha), - np.dot(rotAx[2], rotAx[1]) * - (1-np.cos(alpha)) + rotAx[0] * np.sin(alpha), - np.cos(alpha) + np.power(rotAx[2], 2) * (1-np.cos(alpha))]] - - rotVec = np.transpose(np.dot(Rot, np.transpose(vec))) - return rotVec - - -def optic_flow(scene, viewing_directions, velocity, distance_channel=3): - """ optic flow - :param scene: ibpc - :param viewing_directions: viewing direction of each pixel - (azimuth,elevation) - :param velocity: pandas series - (x,y,z,alpha,beta,gamma,dx,dy,dz,dalpha,dbeta,dgamma) - :distance channel: distance""" - check_scene(scene) - if distance_channel not in range(4): - raise ValueError('distance channel out of range') +def _check_optic_flow_param(viewing_directions, + velocity): if not isinstance(velocity, pd.Series): raise TypeError('velocity should be a pandas Series') if velocity is None: @@ -201,6 +25,7 @@ def optic_flow(scene, viewing_directions, velocity, distance_channel=3): if not isinstance(velocity.index, pd.core.index.MultiIndex): raise Exception('velocity must have a multiindex containing \ the convention used') + index = velocity.index convention = index.get_level_values(0)[-1] if convention not in _AXES2TUPLE.keys(): @@ -226,25 +51,83 @@ def optic_flow(scene, viewing_directions, velocity, distance_channel=3): raise TypeError("angels must be list or np.ndarray") if not is_numeric_array(viewing_directions): raise TypeError("viewing_direction must be of numerical type") - if viewing_directions.shape[1] != 2: - raise Exception("second dimension of viewing\ - direction must be of size two") - distance = np.squeeze(scene[:, :, distance_channel, 0]) - # distance += distance - elevation = viewing_directions[..., __spherical_indeces__['elevation']] - azimuth = viewing_directions[..., __spherical_indeces__['azimuth']] + + return index, convention + + +def optic_flow_rotationonal(viewing_directions, velocity): + """ rotational optic flow + :param viewing_directions: viewing direction of each pixel + (azimuth,elevation) + :param velocity: pandas series + (x,y,z,alpha,beta,gamma,dx,dy,dz,dalpha,dbeta,dgamma) + """ + passindex, convention = _check_optic_flow_param(viewing_directions, + velocity) + # Warning ONLY for ibpc + elevation = viewing_directions[:, 0, + __spherical_indeces__['elevation']] + azimuth = viewing_directions[0, :, __spherical_indeces__['azimuth']] + yaw = velocity[convention]['alpha_0'] pitch = velocity[convention]['alpha_1'] roll = velocity[convention]['alpha_2'] dyaw = velocity[convention]['dalpha_0'] dpitch = velocity[convention]['dalpha_1'] droll = velocity[convention]['dalpha_2'] - + # Check if rotation are not too large + # because we assume small rotation + # according to Koenderink van Dorn if ((np.abs(dyaw) > np.pi/2 and 2*np.pi - np.abs(dyaw) > np.pi/2) or - (np.abs(dpitch) > np.pi/2 and 2*np.pi - np.abs(dpitch) > np.pi/2) or - (np.abs(droll) > np.pi/2 and 2*np.pi - np.abs(droll) > np.pi/2)): + (np.abs(dpitch) > np.pi/2 and 2*np.pi - np.abs(dpitch) > np.pi/2) or + (np.abs(droll) > np.pi/2 and 2*np.pi - np.abs(droll) > np.pi/2)): raise ValueError('rotation exceeds 90°, computation aborted') + # we init a matrix for rot + rof = np.zeros_like(velocity.shape[..., 0]) + hof = np.zeros_like(rof) + vof = np.zeros_like(rof) + # Calculate the angular velocities + opticFlowR = angular_velocity(yaw, pitch, roll, + dyaw, dpitch, droll, convention) + # project it on the eye + for i, a in enumerate(azimuth): + for j, e in enumerate(elevation): + # Transform OF from Cartesian coordinates to Spherical coordinates + # according to method + (OF_rho, OF_phi, OF_epsilon) = \ + cartesian_to_spherical_vectors(opticFlowR, [a, e]) + rof[j, i] = OF_rho + hof[j, i] = OF_phi + vof[j, i] = OF_epsilon + return rof, hof, vof + +def optic_flow_translational(scene, viewing_directions, + velocity, distance_channel=3): + """ translational optic flow + : param scene: ibpc + : param viewing_directions: viewing direction of each pixel + (azimuth, elevation) + : param velocity: pandas series + (x, y, z, alpha, beta, gamma, dx, + dy, dz, dalpha, dbeta, dgamma) + : distance channel: distance""" + check_scene(scene) + passindex, convention = _check_optic_flow_param(viewing_directions, + velocity) + if is_ibpc(scene): + elevation = viewing_directions[:, 0, + __spherical_indeces__['elevation']] + azimuth = viewing_directions[0, :, __spherical_indeces__['azimuth']] + else: + raise NameError('Only ibpc are at the moment supported') + yaw = velocity[convention]['alpha_0'] + pitch = velocity[convention]['alpha_1'] + roll = velocity[convention]['alpha_2'] + # optic flow depnd of distance + distance = scene[..., distance_channel, 0] + distance[distance == 0] = np.nan # Contact with object + # and translational velocity u = [velocity['location']['dx'], velocity['location']['dy'], velocity['location']['dz']] @@ -253,61 +136,61 @@ def optic_flow(scene, viewing_directions, velocity, distance_channel=3): u = [0, 0, 0] else: u = u/np.linalg.norm(u) - - rof = np.zeros((azimuth.shape[0], elevation.shape[0])) - hof = np.zeros((azimuth.shape[0], elevation.shape[0])) - vof = np.zeros((azimuth.shape[0], elevation.shape[0])) + # we init a matrix for rot + rof = np.zeros_like(viewing_directions[..., 0]) + hof = np.zeros_like(rof) + vof = np.zeros_like(rof) for i, a in enumerate(azimuth): for j, e in enumerate(elevation): - spline = OFSubroutineSpToVector([a, e]) + spline = np.array(spherical_to_cartesian(e, a)) M = compose_matrix(angles=[yaw, pitch, roll], translate=None, perspective=None, axes=convention)[:3, :3] spline = np.dot(M, spline) - p = distance[j, i] - if(p == 0): - # if object touches the enviroment, OpticFlow dosnt need to be - # scaled -> distance=1 - p = 1 - # the Translation-part of the Optic Flow: dotvu = np.dot(u, v) splinetrans = np.transpose(spline) - opticFlowT = -(dotvu-(np.dot(dotvu, splinetrans))*spline)/p - # check if there actualy is a rotation and if compute - # surface-normal and scale it with angle between vectors - # negative because flow relative to observer - - # combine the rotations - opticFlowR = angular_velocity(yaw, pitch, roll, - dyaw, dpitch, droll, convention) - # and add Translation and Rotation to get the Optic Flow - opticFlow = opticFlowT+opticFlowR - - # Transform OF from Cartesian coordinates to Spherical coordinates - # according to method - (OF_rho, OF_phi, OF_epsilon) =\ - cartesian_to_spherical_vectors(opticFlow, - [yaw, pitch, roll], - [a, e]) + opticFlowT = -(dotvu-(np.dot(dotvu, splinetrans))*spline) + (OF_rho, OF_phi, OF_epsilon) = \ + cartesian_to_spherical_vectors(opticFlowT, [a, e]) rof[j, i] = OF_rho hof[j, i] = OF_phi vof[j, i] = OF_epsilon + rof /= distance + hof /= distance + vof /= distance return rof, hof, vof +def optic_flow(scene, viewing_directions, + velocity, distance_channel=3): + """ optic flow + : param scene: ibpc + : param viewing_directions: viewing direction of each pixel + (azimuth, elevation) + : param velocity: pandas series + (x, y, z, alpha, beta, gamma, dx, + dy, dz, dalpha, dbeta, dgamma) + : distance channel: distance""" + rofr, hofr, vofr = optic_flow_rotationonal(viewing_directions, velocity) + roft, hoft, voft = optic_flow_translational((scene, viewing_directions, + velocity, distance_channel)) + return rofr+roft, hofr+hoft, vofr+voft + + class Module(): """ This class represents a Module that functions as a storage """ + def __init__(self, size=(180, 360)): """ initializes the storage as an np.ndarray containing zeros of size size - :param size: the tupel containing the size of the - storage (Input) + : param size: the tupel containing the size of the + storage(Input) """ if size is None: raise ValueError("size must not be None") @@ -322,8 +205,8 @@ class Module(): def size(self): """ getter for the the size field - :returns size: size of the Input field - :rtype tuple + : returns size: size of the Input field + : rtype tuple """ return self.__size @@ -331,7 +214,7 @@ class Module(): def size(self, size): """ setter for the size of the storage - :param size: tuple that contains the size of the storage + : param size: tuple that contains the size of the storage """ if size is None: raise ValueError("size must not be None") @@ -345,8 +228,8 @@ class Module(): def Input(self): """ getter for the Input field - :returns Input - :rtype np.ndarray + : returns Input + : rtype np.ndarray """ return self.__Input @@ -355,7 +238,7 @@ class Module(): """ setter for the Input field, automaticaly sets the the size field to the shape of the Input - :param Input + : param Input """ if Input is None: raise ValueError("Input must not be None") @@ -381,13 +264,14 @@ class lp(Module): """ Implementation of a low pass filter """ + def __init__(self, tau, inM): """ Initializes the lowpass filter, the size of the output is - set to the size of the input signal (inM) - :param tau: time constant of the filter - :param freq: cut off frequence of the filter - :param inM: Module that stores and represents the input signal + set to the size of the input signal(inM) + : param tau: time constant of the filter + : param freq: cut off frequence of the filter + : param inM: Module that stores and represents the input signal """ if not isinstance(tau, float) and isinstance(tau, int): raise ValueError("tau must be of type float or integer") @@ -412,7 +296,7 @@ class lp(Module): def itau(self, itau): """ setter of the time constant - :param itau: time constant + : param itau: time constant """ if not isinstance(itau, float) and isinstance(itau, int): raise ValueError("itau must be of type float or integer") @@ -422,8 +306,8 @@ class lp(Module): def inM(self): """ setter of the input Module - :returns inM - :rtype Module + : returns inM + : rtype Module """ return self.__inM @@ -431,7 +315,7 @@ class lp(Module): def inM(self, inM): """ setter of the input Module - :param inM + : param inM """ if inM is None: raise ValueError("input Module must not be None") @@ -442,7 +326,7 @@ class lp(Module): def update(self,): """ update functions, updates the filtered signal for the - the current input signal. out_t+1 +=tau*(input-out_t) + the current input signal. out_t+1 += tau*(input-out_t) """ In = self.inM.Input for i in range(self.size[0]): @@ -454,12 +338,13 @@ class hp(Module): # for second order just take hp for inM """ Implements a high pass filter """ + def __init__(self, tau, inM): """ Initializes the high pass filter - :param tau: time constant - :param freq: cut off frequency - :param inM: Module that stores the input signal + : param tau: time constant + : param freq: cut off frequency + : param inM: Module that stores the input signal """ if not isinstance(tau, float) and isinstance(tau, int): raise ValueError("tau must be of type float or integer") @@ -476,8 +361,8 @@ class hp(Module): # for second order just take hp for inM def inM(self): """ getter for the input Module - :returns inM - :rtype Module + : returns inM + : rtype Module """ return self.__inM @@ -485,7 +370,7 @@ class hp(Module): # for second order just take hp for inM def inM(self, inM): """ setter for the input Module - :param inM: input Module for the input signal + : param inM: input Module for the input signal """ if inM is None: raise ValueError("input Module must not be None") @@ -496,7 +381,7 @@ class hp(Module): # for second order just take hp for inM def update(self,): """ updates the output signal with the current input signal - out_t+1=Input-lowpass(Input) + out_t+1 = Input-lowpass(Input) """ self.inM.update() self.lowpass.update() @@ -511,11 +396,12 @@ class mul(Module): """ Implements the multiplication of two Modules """ + def __init__(self, inM1, inM2): """ Initializes the multiplication module - :param inM1: first input Module - :param inM2: second input Module + : param inM1: first input Module + : param inM2: second input Module """ if inM1 is None: raise ValueError("input Module must not be None") @@ -533,8 +419,8 @@ class mul(Module): def inM1(self): """ getter for the first input Module - :returns inM1 - :rtype Module + : returns inM1 + : rtype Module """ return self.__inM1 @@ -542,7 +428,7 @@ class mul(Module): def inM(self, inM1): """ setter for the first input Module - :param inM1 + : param inM1 """ if inM1 is None: raise ValueError("input Module must not be None") @@ -554,8 +440,8 @@ class mul(Module): def inM2(self): """ getter for the second input Module - :returns inM2 - :rtype Module + : returns inM2 + : rtype Module """ return self.__inM2 @@ -563,7 +449,7 @@ class mul(Module): def inM2(self, inM2): """ setter for the first input Module - :param inM1 + : param inM1 """ if inM2 is None: raise ValueError("input Module must not be None") @@ -573,11 +459,11 @@ class mul(Module): def update(self, shift, axis=None): """ - updates the output (multiplication of the two input Modules) - for the current input (see numpy roll) - :param shift: shifts the Input provided by the first module + updates the output(multiplication of the two input Modules) + for the current input(see numpy roll) + : param shift: shifts the Input provided by the first module by the given amount - :param axis: shifts the Input of the first module along the + : param axis: shifts the Input of the first module along the provided axis """ if not isinstance(shift, int): @@ -597,11 +483,12 @@ class div(Module): """ Implements the division of two Modules """ + def __init__(self, inM1, inM2): """ Initializes the multiplication module - :param inM1: first input Module - :param inM2: second input Module + : param inM1: first input Module + : param inM2: second input Module """ if inM1 is None: raise ValueError("input Module must not be None") @@ -619,8 +506,8 @@ class div(Module): def inM1(self): """ getter for the first input Module - :returns inM1 - :rtype Module + : returns inM1 + : rtype Module """ return self.__inM1 @@ -628,7 +515,7 @@ class div(Module): def inM(self, inM1): """ setter for the first input Module - :param inM1 + : param inM1 """ if inM1 is None: raise ValueError("input Module must not be None") @@ -640,8 +527,8 @@ class div(Module): def inM2(self): """ getter for the second input Module - :returns inM2 - :rtype Module + : returns inM2 + : rtype Module """ return self.__inM2 @@ -649,7 +536,7 @@ class div(Module): def inM2(self, inM2): """ setter for the first input Module - :param inM1 + : param inM1 """ if inM2 is None: raise ValueError("input Module must not be None") @@ -659,11 +546,11 @@ class div(Module): def update(self, shift, axis=None): """ - updates the output (division of the two input Modules) - for the current input (see numpy roll) - :param shift: shifts the Input provided by the first module + updates the output(division of the two input Modules) + for the current input(see numpy roll) + : param shift: shifts the Input provided by the first module by the given amount - :param axis: shifts the Input of the first module along the + : param axis: shifts the Input of the first module along the provided axis """ if not isinstance(shift, int): diff --git a/navipy/processing/test_OpticFlow.py b/navipy/processing/test_OpticFlow.py index c3b7a3b..d5033c8 100644 --- a/navipy/processing/test_OpticFlow.py +++ b/navipy/processing/test_OpticFlow.py @@ -2,6 +2,9 @@ import unittest import navipy.processing.mcode as mcode import pandas as pd import numpy as np +from navipy.moving.agent import posorient_columns +from navipy.moving.agent import velocities_columns +from navipy.scene import __spherical_indeces__ def Scale(data, oldmin, oldmax, mini, maxi, ran): @@ -256,108 +259,60 @@ class TestCase(unittest.TestCase): the bee moves only in x-direction keeping the other parameters (y,z,yaw,pitch,roll) constant """ - positions = np.array([[-20, -20, 2.6, 1.57079633, 0, 0], - [-19.8, -20, 2.6, 1.57079633, 0, 0]]) - - x = positions[0, 0] - y = positions[0, 1] - z = positions[0, 2] - yaw = positions[0, 3] - pitch = positions[0, 4] - roll = positions[0, 5] - - dx = positions[1, 0] - positions[0, 0] - dy = positions[1, 1] - positions[0, 1] - dz = positions[1, 2] - positions[0, 2] - dyaw = positions[1, 3] - positions[0, 3] - dpitch = positions[1, 4] - positions[0, 4] - droll = positions[1, 5] - positions[0, 5] - - self.velocity['location']['x'] = x - self.velocity['location']['y'] = y - self.velocity['location']['z'] = z - self.velocity[self.convention]['alpha_0'] = yaw - self.velocity[self.convention]['alpha_1'] = pitch - self.velocity[self.convention]['alpha_2'] = roll - self.velocity['location']['dx'] = dx - self.velocity['location']['dy'] = dy - self.velocity['location']['dz'] = dz - self.velocity[self.convention]['dalpha_0'] = dyaw - self.velocity[self.convention]['dalpha_1'] = dpitch - self.velocity[self.convention]['dalpha_2'] = droll - - rof, hof, vof = mcode.optic_flow(self.scene, - self.viewing_directions, - self.velocity) - - testrof = [[2.88404299e-34, 8.22008280e-20, 1.64376617e-19], - [2.34252646e-05, 4.64310635e-05, 6.94159777e-05], - [9.36297157e-05, 1.38760866e-04, 1.83822856e-04]] - testhof = [[0.07692013, 0.07690842, 0.07687327], - [0.0768967, 0.07686158, 0.07680307], - [0.07682644, 0.07676796, 0.07668619]] - testvof = [[2.46536984e-10, 1.34244164e-03, 2.68447412e-03], - [1.34203275e-03, 2.68345936e-03, 4.02366094e-03], - [2.68120450e-03, 4.02043495e-03, 5.35762781e-03]] - - assert np.all(np.isclose(rof, testrof)) - assert np.all(np.isclose(hof, testhof)) - assert np.all(np.isclose(vof, testvof)) - - def test_only_yaw_new_vs_old(self): - """ - this test checks for the correct response if for example - the bee rotates only around the yaw axis keeping the other - parameters (x,y,z,pitch,roll) constant - """ - # yaw only everything zero - # only vertical should change, horizontal stays same - positions = np.array([[-20, -20, 2.6, 1.57079633, 0, 0], - [-20, -20, 2.6, 1.90079633, 0, 0]]) - - x = positions[0, 0] - y = positions[0, 1] - z = positions[0, 2] - yaw = positions[0, 3] - pitch = positions[0, 4] - roll = positions[0, 5] - - dx = positions[1, 0]-positions[0, 0] - dy = positions[1, 1]-positions[0, 1] - dz = positions[1, 2]-positions[0, 2] - dyaw = positions[1, 3]-positions[0, 3] - dpitch = positions[1, 4]-positions[0, 4] - droll = positions[1, 5]-positions[0, 5] - - self.velocity['location']['x'] = x - self.velocity['location']['y'] = y - self.velocity['location']['z'] = z - self.velocity[self.convention]['alpha_0'] = yaw - self.velocity[self.convention]['alpha_1'] = pitch - self.velocity[self.convention]['alpha_2'] = roll - self.velocity['location']['dx'] = dx - self.velocity['location']['dy'] = dy - self.velocity['location']['dz'] = dz - self.velocity[self.convention]['dalpha_0'] = dyaw - self.velocity[self.convention]['dalpha_1'] = dpitch - self.velocity[self.convention]['dalpha_2'] = droll - - rof, hof, vof = mcode.optic_flow(self.scene, self.viewing_directions, - self.velocity) - - testrof = [[-6.47644748e-26, -3.52655120e-19, -7.05202754e-19], - [-1.84591335e-11, -1.00513560e-04, -2.00996485e-04], - [-3.69126442e-11, -2.00996503e-04, -4.01931744e-04]] - testhof = [[-0.33, -0.32994974, -0.32979897], - [-0.33, -0.32994974, -0.32979897], - [-0.33, -0.32994974, -0.32979897]] - testvof = [[-1.05768414e-09, -5.75929518e-03, -1.15168350e-02], - [-1.05752305e-09, -5.75841801e-03, -1.15150809e-02], - [-1.05703983e-09, -5.75578677e-03, -1.15098192e-02]] - - assert np.all(np.isclose(rof, testrof)) - assert np.all(np.isclose(hof, testhof)) - assert np.all(np.isclose(vof, testvof)) + convention = 'zyx' + tuples_posvel = posorient_columns(convention) + tuples_posvel.extend(velocities_columns(convention)) + index_posvel = pd.MultiIndex.from_tuples(tuples_posvel, + names=['position', + 'orientation']) + velocity = pd.Series(index=index_posvel, data=0) + velocity.loc[('location', 'dx')] = np.random.randn() + + elevation = np.linspace(-np.pi/2, np.pi/2, 5) + azimuth = np.linspace(-np.pi, np.pi, 11) + [ma, me] = np.meshgrid(azimuth, elevation) + imshape = me.shape + vdir = np.zeros((ma.shape[0], ma.shape[1], 2)) + vdir[..., __spherical_indeces__['elevation']] = me + vdir[..., __spherical_indeces__['azimuth']] = ma + + scene = np.random.rand(imshape[0], imshape[1]) + scene = np.tile(scene[..., np.newaxis], 4) + scene = scene[..., np.newaxis] + + rof, hof, vof =\ + mcode.optic_flow_translational(scene, vdir, + velocity, distance_channel=3) + hnorm = np.sqrt(hof**2 + vof ** 2) + # Add abs tol because we compare to zero + np.testing.assert_allclose(rof, np.zeros_like(rof), atol=1e-7) + # The motion is in the plane x,y. + # so no vof at null elevation + vof_el = vof[elevation == 0, :] + np.testing.assert_allclose(vof_el, np.zeros_like(vof_el), atol=1e-7) + + # The translational optic flow norm should be small + # for further away objects + # except for the focus of contraction + # i.e. aximuth np.pi and el = 0 + # and focus of expension + # i.e. aximuth 0 and el = 0 + valid = (vdir[..., __spherical_indeces__['elevation']] == 0) \ + & (vdir[..., __spherical_indeces__['azimuth']] == 0) + valid = valid | ( + (vdir[..., __spherical_indeces__['elevation']] == 0) + & (vdir[..., __spherical_indeces__['azimuth']] == np.pi)) + valid = valid | ( + (vdir[..., __spherical_indeces__['elevation']] == 0) + & (vdir[..., __spherical_indeces__['azimuth']] == -np.pi)) + valid = not valid + + rof_further, hof_further, vof_further =\ + mcode.optic_flow_translational(scene+1, vdir, + velocity, distance_channel=3) + hnorm_further = np.sqrt(hof_further**2 + vof_further**2) + + np.testing.assert_array_less(hnorm_further[valid], hnorm[valid]) def test_only_yaw(self): """ @@ -410,9 +365,9 @@ class TestCase(unittest.TestCase): [-1.05752305e-09, -5.75841801e-03, -1.15150809e-02], [-1.05703983e-09, -5.75578677e-03, -1.15098192e-02]] - assert np.all(np.isclose(rof, testrof)) - assert np.all(np.isclose(hof, testhof)) - assert np.all(np.isclose(vof, testvof)) + np.testing.assert_allclose(rof, testrof) + np.testing.assert_allclose(hof, testhof) + np.testing.assert_allclose(vof, testvof) def test_only_yaw_big(self): """ @@ -481,7 +436,7 @@ class TestCase(unittest.TestCase): if not has_zeros: factor = np.array(hof[:, 0]/cosel) - assert np.all(factor == factor[0]) + self.assertAlmostEquals(factor, factor[0]) def test_only_pitch(self): """ @@ -531,9 +486,9 @@ class TestCase(unittest.TestCase): [-1.74524096e-03, -1.74524096e-03, -1.74524096e-03], [-3.48994999e-03, -3.48994999e-03, -3.48994999e-03]] - assert np.all(np.isclose(rof, testrof)) - assert np.all(np.isclose(hof, testhof)) - assert np.all(np.isclose(vof, testvof)) + np.testing.assert_allclose(rof, testrof) + np.testing.assert_allclose(hof, testhof) + np.testing.assert_allclose(vof, testvof) def test_only_pitch_big(self): """ @@ -654,10 +609,10 @@ class TestCase(unittest.TestCase): factorsin[has_zerossin] = factorsin[0] for i in range(1, len(factor)): - assert np.isclose(factor[1], factor[i]) + self.assertAlmostEqual(np.isclose(factor[1], factor[i])) if i == 90: continue - assert np.isclose(factorsin[0], factorsin[i]) + self.assertAlmostEqual(factorsin[0], factorsin[i]) # print(i) # assert np.all(np.isclose(hof, testhof)) # assert np.all(np.isclose(vof, testvof)) @@ -709,77 +664,9 @@ class TestCase(unittest.TestCase): [0.09879836, 0.09893931, 0.09905007], [0.09856329, 0.09870419, 0.09881489]] - assert np.all(np.isclose(rof, testrof)) - assert np.all(np.isclose(hof, testhof)) - assert np.all(np.isclose(vof, testvof)) - """ - def test_findconv(self): - ypr=[1.57079633,0.1,0.1] - vec=[0, -0.09900333, 0.00993347] - tmpvec = vec - - - M1 = rotation_matrix(-ypr[2], [1, 0, 0])[:3, :3] - vec = np.transpose(np.dot(M1, np.transpose(vec))) - roty = np.transpose(np.dot(M1, np.transpose([0, 1, 0]))) - M2 = rotation_matrix(-ypr[1], roty)[:3, :3] - vec = np.transpose(np.dot(M2, np.transpose(vec))) - rotz = np.transpose(np.dot(M1, np.transpose([0, 0, 1]))) - #rotz = np.transpose(np.dot(M2, np.transpose(rotz))) - M4 = rotation_matrix(-ypr[1], [0, 1, 0])[:3, :3] - rotatedax = np.transpose(np.dot(M4, np.transpose(rotz))) - - M3 = rotation_matrix(-ypr[0], rotatedax) - scale, shear, angles, translate, perspective = - decompose_matrix(M3,'xyz') - print("angels", angles) - vec = np.transpose(np.dot(M3[:3,:3], np.transpose(vec))) - print("old vec", vec) - oldvec=vec - - angles=[ypr[0], ypr[1],ypr[2], -ypr[0], -ypr[1], -ypr[2]] - - #for c in ['xyz','xyx','xzy','xzx','yzx','yzy','yxz','yxy', - # 'zxy','zxz','zyx','zyz','zyx','xyx','yzx','xzx', - # 'xzy','yzy','zxy','yxy','yxz','zxz','xyz','zyz']: - for al in angles: - for bl in angles: - for cl in angles: - M = compose_matrix(scale=None, shear=None, - angles=[al, bl,cl], - translate=None, - perspective=None, - axes='zyx')[:3,:3] - #M= np.transpose(M) - vec = np.dot(np.transpose(M), vec) - if (np.isclose(vec[0], oldvec[0]) or - np.isclose(vec[0], oldvec[1]) or - np.isclose(vec[0], oldvec[2]) or\ - np.isclose(vec[0], -oldvec[0]) or - np.isclose(vec[0], -oldvec[1]) or - np.isclose(vec[0], -oldvec[2])) and\ - (np.isclose(vec[1], oldvec[0]) or - np.isclose(vec[1], oldvec[1]) or - np.isclose(vec[1], oldvec[2]) or\ - np.isclose(vec[1], -oldvec[0]) or - np.isclose(vec[1], -oldvec[1]) or - np.isclose(vec[1], -oldvec[2])) and\ - (np.isclose(vec[2], oldvec[0]) or - np.isclose(vec[2], oldvec[1]) or - np.isclose(vec[2], oldvec[2]) or\ - np.isclose(vec[2], -oldvec[0]) or - np.isclose(vec[2], -oldvec[1]) or - np.isclose(vec[2], -oldvec[2])): - print("found") - print("conve", al, bl, cl) - print("new vec", vec) - #scale, shear, angles, translate, - perspective =decompose_matrix(M3,c) - #print("angels", angles) - #print("old vec", oldvec) - print("new vec", vec) - vec=tmpvec - """ + np.testing.assert_allclose(rof, testrof) + np.testing.assert_allclose(hof, testhof) + np.testing.assert_allclose(vof, testvof) if __name__ == '__main__': -- GitLab