diff --git a/navipy/database/__init__.py b/navipy/database/__init__.py index faf8eff5fcfc3d4f503bd67d90b2e01b9340d24f..7d4f71e311b8266441e6ca54fd94ed574f9e68bd 100644 --- a/navipy/database/__init__.py +++ b/navipy/database/__init__.py @@ -10,6 +10,7 @@ import io from navipy.scene import is_numeric_array, check_scene from navipy.maths import constants as mconst from navipy.trajectories import Trajectory +from navipy.trajectories import posorient_columns import logging import numbers @@ -195,6 +196,7 @@ class DataBase(): WHERE (rowid=?) """.format(tablename), (rowid,)) self.__viewing_dir = self.db_cursor.fetchone()[0] + self.__viewing_dir = np.round(self.__viewing_dir, decimals=3) return self.__viewing_dir.copy() @viewing_directions.setter @@ -316,7 +318,7 @@ class DataBase(): [convention]['q_3'] **where convention can be: quaternion - :type rowid: pd.Series + :type posorient: pd.Series :returns: id :rtype: int """ @@ -551,7 +553,7 @@ class DataBase(): if self.rotation_convention is not None: posorients = Trajectory() posorients.from_dataframe(posorient, rotconv=self.__convention) - return posorients + return posorients.astype(float) @property def normalisations(self): @@ -605,43 +607,33 @@ class DataBase(): WHERE (rowid={}) """.format(tablename, rowid), self.db) toreturn = toreturn.loc[0, :] - toreturn.name = toreturn.id - toreturn.drop('id') + # if np.isnan(toreturn.frame) + toreturn.name = toreturn.frame_i + # toreturn.drop('id') # toreturn = toreturn.astype(float) posorient = None convention = toreturn.rotconv_id + + tuples = posorient_columns(convention) + index = pd.MultiIndex.from_tuples(tuples) + posorient = pd.Series(index=index) + posorient['location']['x'] = toreturn.loc['x'] + posorient['location']['y'] = toreturn.loc['y'] + posorient['location']['z'] = toreturn.loc['z'] + if convention != 'quaternion': - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), ('xyz', 'alpha_0'), - ('xyz', 'alpha_1'), ('xyz', 'alpha_2')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', - 'orientation']) - posorient = pd.Series(index=index) - posorient['location']['x'] = toreturn.loc['x'] - posorient['location']['y'] = toreturn.loc['y'] - posorient['location']['z'] = toreturn.loc['z'] posorient[convention]['alpha_0'] = toreturn.loc['q_0'] posorient[convention]['alpha_1'] = toreturn.loc['q_1'] posorient[convention]['alpha_2'] = toreturn.loc['q_2'] + posorient.name = toreturn.name else: - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), (convention, 'q_0'), - (convention, 'q_1'), (convention, 'q_2'), - (convention, 'q_3')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', - 'orientation']) - posorient = pd.Series(index=index) - posorient['location']['x'] = toreturn.loc['x'] - posorient['location']['y'] = toreturn.loc['y'] - posorient['location']['z'] = toreturn.loc['z'] posorient[convention]['q_0'] = toreturn.loc['q_0'] posorient[convention]['q_1'] = toreturn.loc['q_1'] posorient[convention]['q_2'] = toreturn.loc['q_2'] posorient[convention]['q_3'] = toreturn.loc['q_3'] + posorient.name = toreturn.name - return posorient + return posorient.astype(float) def scene(self, posorient=None, rowid=None): """Read an image at a given position-orientation or given id of row in the \ diff --git a/navipy/database/test.py b/navipy/database/test.py index 1dd96048e3a7a835cab7a38f4880299299ad827f..416765f0094d484ddfce21bfbcd39fe1e31e9c18 100644 --- a/navipy/database/test.py +++ b/navipy/database/test.py @@ -87,112 +87,6 @@ class TestCase(unittest.TestCase): self.mydb.check_data_validity(n) assert self.mydb.check_data_validity(1) - def get_posid_test(self): - """ - this test checks the function get_posid works - correctly. - it checks if correct errors are raised for: - - posorient is missing an entry (no 'x' column) - - posorient contains nan or none values - - posorient is of wrong type (dict instead of pd.series) - """ - conn = sqlite3.connect(self.mydb_filename) - c = conn.cursor() - c.execute(""" SELECT * FROM position_orientation WHERE (rowid=1) """) - rows = c.fetchall()[0] - # convention = rows[1] - - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), ('xyz', 'alpha_0'), - ('xyz', 'alpha_1'), ('xyz', 'alpha_2')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', 'orientation']) - - posorient = pd.Series(index=index) - posorient['location']['x'] = rows[6] - posorient['location']['y'] = rows[7] - posorient['location']['z'] = rows[8] - posorient['xyz']['alpha_0'] = rows[3] - posorient['xyz']['alpha_1'] = rows[5] - posorient['xyz']['alpha_2'] = rows[4] - - posid = self.mydb.get_posid(posorient) - assert posid == 1 - - # incorrect case missing column - tuples = [('location', 'y'), - ('location', 'z'), ('xyz', 'alpha_0'), - ('xyz', 'alpha_1'), ('xyz', 'alpha_2')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', 'orientation']) - - posorient2 = pd.Series(index=index) - posorient2['location']['y'] = rows[7] - posorient2['location']['z'] = rows[8] - posorient2['xyz']['alpha_0'] = rows[3] - posorient2['xyz']['alpha_1'] = rows[5] - posorient2['xyz']['alpha_2'] = rows[4] - - with self.assertRaises(Exception): - posid = self.mydb.get_posid(posorient2) - - # incorrect case None - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), ('xyz', 'alpha_0'), - ('xyz', 'alpha_1'), ('xyz', 'alpha_2')] - index = pd.MultiIndex.from_tuples(self.tuples, - names=['position', 'orientation']) - - posorient3 = pd.Series(index=index) - posorient3['location']['x'] = None - posorient3['location']['y'] = rows[7] - posorient3['location']['z'] = rows[8] - posorient3['xyz']['alpha_0'] = rows[3] - posorient3['xyz']['alpha_1'] = rows[5] - posorient3['xyz']['alpha_2'] = rows[4] - with self.assertRaises(ValueError): - posid = self.mydb.get_posid(posorient2) - - # incorrect case nan - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), ('xyz', 'alpha_0'), - ('xyz', 'alpha_1'), ('xyz', 'alpha_2')] - index = pd.MultiIndex.from_tuples(self.tuples, - names=['position', 'orientation']) - - posorient2 = pd.Series(index=index) - posorient2['location']['x'] = np.nan - posorient2['location']['y'] = rows[7] - posorient2['location']['z'] = rows[8] - posorient2['xyz']['alpha_0'] = rows[3] - posorient2['xyz']['alpha_1'] = rows[5] - posorient2['xyz']['alpha_2'] = rows[4] - with self.assertRaises(ValueError): - posid = self.mydb.get_posid(posorient2) - - # incorrect case no pandas series but dict - posorient2 = {} - posorient2['location']['x'] = rows[6] - posorient2['location']['y'] = rows[7] - posorient2['location']['z'] = rows[8] - posorient2['xyz']['alpha_0'] = rows[3] - posorient2['xyz']['alpha_1'] = rows[5] - posorient2['xyz']['alpha_2'] = rows[4] - with self.assertRaises(TypeError): - self.mydb.get_posid(posorient2) - - # not working case empty - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), ('xyz', 'alpha_0'), - ('xyz', 'alpha_1'), ('xyz', 'alpha_2')] - index = pd.MultiIndex.from_tuples(self.tuples, - names=['position', 'orientation']) - - posorient2 = pd.Series(index=index) - - with self.assertRaises(Exception): - self.mydb.get_posid(posorient2) - def test_read_posorient(self): """ this test checks the function read_posorient works @@ -202,88 +96,32 @@ class TestCase(unittest.TestCase): - posorient contains nan or none values - posorient is of wrong type (dict instead of pd.series) """ - conn = sqlite3.connect(self.mydb_filename) - c = conn.cursor() - c.execute(""" SELECT * FROM position_orientation WHERE (rowid=1) """) - rows = c.fetchall()[0] - # working case - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), ('xyz', 'alpha_0'), - ('xyz', 'alpha_1'), ('xyz', 'alpha_2')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', 'orientation']) - - posorient = pd.Series(index=index) - posorient['location']['x'] = rows[6] - posorient['location']['y'] = rows[7] - posorient['location']['z'] = rows[8] - posorient['xyz']['alpha_0'] = rows[3] - posorient['xyz']['alpha_1'] = rows[5] - posorient['xyz']['alpha_2'] = rows[4] + posorient = self.mydb.read_posorient(rowid=1) posid = self.mydb.read_posorient(posorient=posorient) # print(posid) assert posid['location']['x'] == posorient['location']['x'] # incorrect case missing column - tuples = [('location', 'y'), - ('location', 'z'), ('xyz', 'alpha_0'), - ('xyz', 'alpha_1'), ('xyz', 'alpha_2')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', 'orientation']) - - posorient2 = pd.Series(index=index) - posorient2['location']['y'] = rows[7] - posorient2['location']['z'] = rows[8] - posorient2['xyz']['alpha_0'] = rows[3] - posorient2['xyz']['alpha_1'] = rows[5] - posorient2['xyz']['alpha_2'] = rows[4] + posorient2 = posorient.copy() + posorient2 = posorient2.drop('x', level=1) + posorient2['location']['x'] = np.nan with self.assertRaises(ValueError): self.mydb.read_posorient(posorient=posorient2) # incorrect case None - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), ('xyz', 'alpha_0'), - ('xyz', 'alpha_1'), ('xyz', 'alpha_2')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', 'orientation']) - - posorient2 = pd.Series(index=index) + posorient2 = posorient.copy() posorient2['location']['x'] = None - posorient2['location']['y'] = rows[7] - posorient2['location']['z'] = rows[8] - posorient2['xyz']['alpha_0'] = rows[3] - posorient2['xyz']['alpha_1'] = rows[5] - posorient2['xyz']['alpha_2'] = rows[4] with self.assertRaises(ValueError): self.mydb.read_posorient(posorient=posorient2) # incorrect case nan - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), ('xyz', 'alpha_0'), - ('xyz', 'alpha_1'), ('xyz', 'alpha_2')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', 'orientation']) - - posorient2 = pd.Series(index=index) + posorient2 = posorient.copy() posorient2['location']['x'] = np.nan - posorient2['location']['y'] = rows[7] - posorient2['location']['z'] = rows[8] - posorient2['xyz']['alpha_0'] = rows[3] - posorient2['xyz']['alpha_1'] = rows[5] - posorient2['xyz']['alpha_2'] = rows[4] with self.assertRaises(ValueError): self.mydb.read_posorient(posorient=posorient2) # incorrect case no pandas series but dict - posorient2 = {} - posorient2['location'] = {} - posorient2['xyz'] = {} - posorient2['location']['x'] = rows[6] - posorient2['location']['y'] = rows[7] - posorient2['location']['z'] = rows[8] - posorient2['xyz']['alpha_0'] = rows[3] - posorient2['xyz']['alpha_1'] = rows[5] - posorient2['xyz']['alpha_2'] = rows[4] + posorient2 = posorient.to_dict() with self.assertRaises(TypeError): self.mydb.read_posorient(posorient=posorient2) @@ -295,7 +133,6 @@ class TestCase(unittest.TestCase): names=['position', 'orientation']) posorient2 = pd.Series(index=index) - with self.assertRaises(Exception): self.mydb.read_posorient(posorient=posorient2) @@ -309,27 +146,9 @@ class TestCase(unittest.TestCase): and checks if the returned entry for rowid 1 is correct - that it all columns and correct values """ - conn = sqlite3.connect(self.mydb_filename) - c = conn.cursor() - c.execute(""" SELECT * FROM position_orientation WHERE (rowid=1) """) - rows = c.fetchall()[0] - # working case - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), ('xyz', 'alpha_0'), - ('xyz', 'alpha_1'), ('xyz', 'alpha_2')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', 'orientation']) - - posorient = pd.Series(index=index) - posorient['location']['x'] = rows[6] - posorient['location']['y'] = rows[7] - posorient['location']['z'] = rows[8] - posorient['xyz']['alpha_0'] = rows[3] - posorient['xyz']['alpha_1'] = rows[5] - posorient['xyz']['alpha_2'] = rows[4] + posorient = self.mydb.posorients.iloc[0, :] for rowid in [0, -2]: with self.assertRaises(ValueError): - # print("rowid",rowid) self.mydb.read_posorient(rowid=rowid) with self.assertRaises(TypeError): self.mydb.read_posorient(rowid='T') @@ -342,15 +161,7 @@ class TestCase(unittest.TestCase): for rowid in [1]: posoriend2 = self.mydb.read_posorient(rowid=rowid) - assert posoriend2['location']['x'] == posorient['location']['x'] - assert posoriend2['location']['y'] == posorient['location']['y'] - assert posoriend2['location']['z'] == posorient['location']['z'] - assert (posoriend2['xyz']['alpha_0'] == - posorient['xyz']['alpha_0']) - assert (posoriend2['xyz']['alpha_1'] == - posorient['xyz']['alpha_1']) - assert (posoriend2['xyz']['alpha_2'] == - posorient['xyz']['alpha_2']) + pd.testing.assert_series_equal(posoriend2, posorient) def test_scene_id(self): """ @@ -399,23 +210,8 @@ class TestCase(unittest.TestCase): - posorient contains nan or none values - posorient is of wrong type (dict instead of pd.series) """ - conn = sqlite3.connect(self.mydb_filename) - c = conn.cursor() - c.execute(""" SELECT * FROM position_orientation WHERE (rowid=1) """) - rows = c.fetchall()[0] - # working case - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), ('xyz', 'alpha_0'), - ('xyz', 'alpha_1'), ('xyz', 'alpha_2')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', 'orientation']) - posorient = pd.Series(index=index) - posorient['location']['x'] = rows[6] - posorient['location']['y'] = rows[7] - posorient['location']['z'] = rows[8] - posorient['xyz']['alpha_0'] = rows[3] - posorient['xyz']['alpha_1'] = rows[5] - posorient['xyz']['alpha_2'] = rows[4] + posorient = self.mydb.posorients.iloc[0, :] + posorient.name = 1 image = self.mydb.scene(posorient=posorient) self.assertIsNotNone(image) self.assertFalse(sum(image.shape) == 0) @@ -424,74 +220,30 @@ class TestCase(unittest.TestCase): self.assertTrue(image.shape[3] == 1) # incorrect case missing column - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), ('xyz', 'alpha_0'), - ('xyz', 'alpha_1'), ('xyz', 'alpha_2')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', 'orientation']) - - posorient2 = pd.Series(index=index) - posorient2['location']['y'] = rows[7] - posorient2['location']['z'] = rows[8] - posorient2['xyz']['alpha_0'] = rows[3] - posorient2['xyz']['alpha_1'] = rows[5] - posorient2['xyz']['alpha_2'] = rows[4] + posorient2 = posorient.copy() + posorient2 = posorient2.drop(('location', 'x')) with self.assertRaises(Exception): image = self.mydb.scene(posorient=posorient2) # incorrect case None - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), ('xyz', 'alpha_0'), - ('xyz', 'alpha_1'), ('xyz', 'alpha_2')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', 'orientation']) - - posorient2 = pd.Series(index=index) + posorient2 = posorient.copy() posorient2['location']['x'] = None - posorient2['location']['y'] = rows[7] - posorient2['location']['z'] = rows[8] - posorient2['xyz']['alpha_0'] = rows[3] - posorient2['xyz']['alpha_1'] = rows[5] - posorient2['xyz']['alpha_2'] = rows[4] with self.assertRaises(ValueError): image = self.mydb.scene(posorient=posorient2) # incorrect case nan - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), ('xyz', 'alpha_0'), - ('xyz', 'alpha_1'), ('xyz', 'alpha_2')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', 'orientation']) - - posorient2 = pd.Series(index=index) + posorient2 = posorient.copy() posorient2['location']['x'] = np.nan - posorient2['location']['y'] = rows[7] - posorient2['location']['z'] = rows[8] - posorient2['xyz']['alpha_0'] = rows[3] - posorient2['xyz']['alpha_1'] = rows[5] - posorient2['xyz']['alpha_2'] = rows[4] with self.assertRaises(ValueError): image = self.mydb.scene(posorient=posorient2) # incorrect case no pandas series but dict - posorient2 = {} - posorient2['location'] = {} - posorient2['xyz'] = {} - posorient2['location']['x'] = rows[6] - posorient2['location']['y'] = rows[7] - posorient2['location']['z'] = rows[8] - posorient2['xyz']['alpha_0'] = rows[3] - posorient2['xyz']['alpha_1'] = rows[5] - posorient2['xyz']['alpha_2'] = rows[4] + posorient2 = posorient.to_dict() with self.assertRaises(TypeError): image = self.mydb.scene(posorient=posorient2) # not working case empty - index = pd.MultiIndex.from_tuples(tuples, - names=['position', 'orientation']) - - posorient2 = pd.Series(index=index) - + posorient2 = pd.Series(index=posorient.index) with self.assertRaises(Exception): image = self.mydb.scene(posorient=posorient2) diff --git a/navipy/moving/__init__.py b/navipy/moving/__init__.py deleted file mode 100644 index 094792254beb54413d3fbe4b21609ad6c63b3f51..0000000000000000000000000000000000000000 --- a/navipy/moving/__init__.py +++ /dev/null @@ -1,76 +0,0 @@ -""" -Close-loop agent -~~~~~~~~~~~~~~~~ -A standard method to move an agent is to update: - -1. update the sensory information at the current agent location :math:`x` -2. deduce the agent motion :math:`vdt` from this information -3. displace the agent by motion ( :math:`x\\rightarrow x + vdt`) - - -The use of a close loop model including visual rendering is \ -sometimes too slow to efficiently test several models or tune the \ -parameters of a given models. The GridAgent solves this problem by \ -restricting the agent motion on locations with rendered scene. The \ -agent moves thus on a grid, and its next position is always \ -snapped to the closest grid location. The finer the grid is, the \ -larger the database storing all sceneries and grid location is; \ -but also the more accurate the agent motion is. The grid size \ -depend on the storage space, the time you can wait for the \ -database creation, and how sensitive to exact location your model is. - -This iterative method can be used with a wide range of models. \ -In :navipy: differents classes of agents exist. \ -They differ by the method use to update the sensory information: - -+----------------+----------------------------------------------------+ -|Agent class |Sensory update | -+================+====================================================+ -|:CyberBeeAgent: |:Cyberbee: update within blender. | -+----------------+----------------------------------------------------+ -|:GridAgent: |:DataBase: update from a pre-rendered database. | -+----------------+----------------------------------------------------+ - -To deduce the agent motion from the current state of the agent \ -(e.g. position, orientation, sensory information, memories, ...) all \ -classes of agents use callback function, which is a custom function \ -defined by the user. This function takes as input argument, the \ -agent position-orientation and its velocities, and the currently \ -seen scene. The function should return a the agent motion. \ - -Once the agent sensory method and motion method have been configured, \ -the agent can: - -1. move (perform a one step motion), or -2. fly (move until its velocity is null, or until n-steps). - -Agent on a graph -~~~~~~~~~~~~~~~~ -As mentioned above, in every model of navigation the agent motion \ -is derived from its current external state, its position \ -orientation as well as the derivatives, and its internal state. \ -However, when the agent motion is only derived from its current \ -position orientation, and what is seen from this location, the \ -simulation of an agent can be drastically simplified. Indeed, \ -not only the scene at relevant location can be pre-rendered, but \ -the motion of the agent from those locations as well. - -The agent being restricted to move from relevant locations to \ -relevant locations, a graph of interconnected locations can be built.\ -The nodes of the graph are the relevant locations, and the directed \ -edges the motion of the agent from one location to the next. \ -:GraphAgent: can build such graph by simply using a database of \ -pre-rendered scenery at relevant locations, and a function \ -giving the motion of the agent from a scene and the agent \ -position orientation. Once the graph has been generated, \ -attractors can be found, the number of locations converging to \ -those (i.e. the catchment area or volume), if two locations are \ -connected, etc. - -To speed up certain calculations, additional values can stored \ -at each graph node and access from the callback function. It is \ -worth mentioning a warning here. The size of the graph can be \ -incredibly large. Thus, not too much information can be stored \ -at each node. To assess the memory size of the graph before \ -creating it, one can use the tool agent.tools.assess_graphmemsize. -""" diff --git a/navipy/moving/agent.py b/navipy/moving/agent.py deleted file mode 100644 index d8aa92018d06a25d72f07f5d21d4bcbc2f9016d1..0000000000000000000000000000000000000000 --- a/navipy/moving/agent.py +++ /dev/null @@ -1,567 +0,0 @@ -""" -+-------------------------------------------+\ ---------------+-------------+ -|Agent class |\ -Type of agent | Rendering | -+===========================================+\ -==============+=============+ -|:class:`navipy.moving.agent.CyberBeeAgent` |\ -Close loop |Online | -+-------------------------------------------+\ - +-------------+ -|:class:`navipy.moving.agent.GraphAgent` |\ - |Pre-rendered | -+-------------------------------------------+\ ---------------+ + -|:class:`navipy.moving.agent.GridAgent` |\ -Open loop | | -+-------------------------------------------+\ ---------------+-------------+ - - -""" -import numpy as np -import pandas as pd -import copy -import networkx as nx -import multiprocessing -from multiprocessing import Queue, JoinableQueue, Process -import inspect -from navipy.moving import maths as navimomath -from navipy.database import DataBase -import time -import os - -version = float(nx.__version__) - - -def defaultcallback(*args, **kwargs): - """default call back""" - raise NameError('No Callback') - - -class DefaultBrain(): - def __init__(self): - pass - - def update(self, posorient): - raise NameError('No Callback') - - def velocity(self): - raise NameError('No Callback') - - -def posorient_columns(convention): - return [('location', 'x'), - ('location', 'y'), - ('location', 'z'), - (convention, 'alpha_0'), - (convention, 'alpha_1'), - (convention, 'alpha_2')] - - -def velocities_columns(convention): - return [('location', 'dx'), - ('location', 'dy'), - ('location', 'dz'), - (convention, 'dalpha_0'), - (convention, 'dalpha_1'), - (convention, 'dalpha_2')] - - -class AbstractAgent(): - def __init__(self, convention='zyx'): - self._brain = DefaultBrain() - self._alter_posorientvel = defaultcallback - tuples = posorient_columns(convention) - index = pd.MultiIndex.from_tuples(tuples, - names=['position', - 'orientation']) - self._posorient_col = index - - tuples_vel = velocities_columns(convention) - index_vel = pd.MultiIndex.from_tuples(tuples_vel, - names=['position', - 'orientation']) - tuples_posvel = tuples - tuples_posvel.extend(tuples_vel) - index_posvel = pd.MultiIndex.from_tuples(tuples_posvel, - names=['position', - 'orientation']) - self._velocity_col = index_vel - self._posorient_vel_col = index_posvel - self._posorient_vel = pd.Series( - index=self._posorient_vel_col, - data=np.nan) - - @property - def posorient(self): - return self._posorient_vel.loc[self._posorient_col].copy() - - @posorient.setter - def posorient(self, posorient): - if isinstance(posorient, pd.Series) is False: - raise TypeError('posorient should be a pandas Series') - for col in self._posorient_col: - if col not in posorient.index: - raise KeyError( - 'posorient should have {} as index'.format(col)) - self._posorient_vel.loc[self._posorient_col] = \ - posorient.loc[self._posorient_col] - - @property - def velocity(self): - return self._posorient_vel.loc[self._velocity_col].copy() - - @velocity.setter - def velocity(self, velocity): - if isinstance(velocity, pd.Series) is False: - raise TypeError('velocity should be a pandas Series') - for col in self._velocity_col: - if col not in velocity.index: - raise KeyError( - 'velocity should have {} as index'.format(col)) - self._posorient_vel.loc[self._velocity_col] = \ - velocity.loc[self._velocity_col] - - @property - def posorient_vel(self): - return self._posorient_vel.copy() - - @posorient_vel.setter - def posorient_vel(self, posorient_vel): - self.posorient = posorient_vel - self.velocity = posorient_vel - - @property - def brain(self): - return inspect.getsourcelines(self._brain) - - @brain.setter - def brain(self, brain): - self._brain = brain - - @property - def alter_posorientvel(self): - return inspect.getsourcelines(self._alter_posorientvel) - - def move(self): - self._brain.update(self.posorient) - self.velocity = self._brain.velocity() - alteredpos = self._alter_posorientvel(self._posorient_vel) - self.posorient = alteredpos - self.velocity = alteredpos - - def fly(self, max_nstep, return_tra=False): - """move cyberbee until max step has been performed - """ - if return_tra: - trajectory = pd.DataFrame(index=range(0, max_nstep), - columns=self._posorient_vel_col) - trajectory.loc[0, :] = self._posorient_vel.copy() - for stepi in range(1, max_nstep): - self.move() - if return_tra: - trajectory.loc[stepi, :] = self._posorient_vel.copy() - if return_tra: - return trajectory - else: - return None - - -class CyberBeeAgent(AbstractAgent, Process): - """ - A CyberBeeAgent uses the rendering method of cyberbee. \ -CyberBeeAgent is a close loop agent and need to be run within blender \ -(see :doc:`rendering`). - - Single process - Here come example of how to use it - - Multi process - CyberBeeAgent inherit from the Process \ - class of the multiprocessing module of the standard python \ - library. Thus, several GridAgents can safely be run in parallel. - - """ - - def __init__(self, brain, convention, - posorients_queue=None, - results_queue=None): - if convention is None: - raise Exception("a convention must be specified") - if (posorients_queue is not None) and (results_queue is not None): - multiprocessing.Process.__init__(self) - AbstractAgent.__init__(self, convention) - AbstractAgent._alter_posorientvel = \ - lambda motion_vec: navimomath.next_pos(motion_vec, - move_mode='free_run') - self._alter_posorientvel = \ - lambda motion_vec: navimomath.next_pos(motion_vec, - move_mode='free_run') - self.brain = brain - self._posorients_queue = posorients_queue - self._results_queue = results_queue - - def run(self): - """ Only supported when multiprocess""" - if self._posorients_queue is None or self._results_queue is None: - raise NameError('Single agent class has not be inititialised ' - + 'with multiprocessing suppport') - proc_name = self.name - print('Process {} started'.format(proc_name)) - while True: - start_posorient = self._posorients_queue.get(timeout=1) - if start_posorient is None: - # Poison pill means shutdown) - break - common_id = list(set(start_posorient.index).intersection( - self._posorient_vel.index)) - self._posorient_vel.loc[common_id] = start_posorient.loc[common_id] - self._posorient_vel.name = start_posorient.name - self.move() - posorient_vel = self._posorient_vel - self._posorients_queue.task_done() - self._results_queue.put(posorient_vel) - self._posorients_queue.task_done() - print('Process {} done'.format(proc_name)) - - -class GridAgent(AbstractAgent, Process): - """ - A GridAgent fetches the scene from a pre-rendered database. \ -(see :doc:`database`) -GridAgent is a close loop agent here its position is snap to a grid. - - Single process - Here come example of how to use it - - Multi process - GridAgent inherit from the Process \ - class of the multiprocessing module of the standard python \ - library. Thus, several GridAgents can safely be run in parallel. - - - """ - - def __init__(self, brain, - posorients_queue=None, - results_queue=None): - if not isinstance(brain.renderer, DataBase): - msg = 'GridAgent only works with a brain having ' - msg += 'a renderer of type DataBase' - raise TypeError(msg) - convention = brain.renderer.rotation_convention - if (posorients_queue is not None) and (results_queue is not None): - multiprocessing.Process.__init__(self) - AbstractAgent.__init__(self, convention) - self._alter_posorientvel = self.snap_to_grid - self.brain = brain - self._posorients_queue = posorients_queue - self._results_queue = results_queue - - @property - def mode_of_motion(self): - """ - """ - toreturn = self._mode_move - toreturn['describe'] = \ - navimomath.mode_moves_supported()[ - self._mode_move['mode']]['describe'] - return toreturn - - @mode_of_motion.setter - def mode_of_motion(self, mode): - """ - - """ - if not isinstance(mode, dict): - raise TypeError('Mode is not a dictionary') - if 'mode' not in mode: - raise KeyError("'mode' is not a key of mode") - if 'param' not in mode: - raise KeyError("'param' is not a key of mode") - if mode['mode'] in navimomath.mode_moves_supported().keys(): - for param in navimomath.mode_moves_supported()[ - mode['mode']]['param']: - if param not in mode['param']: - raise KeyError( - "'{}' is not in mode['param']".format(param)) - self._mode_move = mode - else: - raise ValueError('mode is not supported') - - def snap_to_grid(self, posorient_vel): - posorient_vel = navimomath.next_pos( - posorient_vel, - move_mode=self._mode_move['mode'], - move_param=self._mode_move['param']) - tmppos = self._brain.posorients - tmp = navimomath.closest_pos( - posorient_vel, tmppos) # self._brain.posorients) - posorient_vel.loc[self._posorient_col] = \ - tmp.loc[self._posorient_col] - posorient_vel.name = tmp.name - return posorient_vel - - def move(self): - if hasattr(self, '_mode_move'): - AbstractAgent.move(self) - else: - raise AttributeError( - 'GridAgent object has no attribute _mode_move\n' + - 'Please set the mode of motion') - - def fly(self, max_nstep, return_tra=False): - if hasattr(self, '_mode_move'): - return AbstractAgent.fly(self, max_nstep, return_tra) - else: - raise AttributeError( - 'GridAgent object has no attribute _mode_move\n' + - 'Please set the mode of motion') - - def run(self): - """ Only supported when multiprocess""" - if self._posorients_queue is None or self._results_queue is None: - raise NameError('Single agent class has not be inititialised ' - + 'with multiprocessing suppport') - proc_name = self.name - print('Process {} started'.format(proc_name)) - while True: - start_posorient = self._posorients_queue.get(timeout=1) - if start_posorient is None: - # Poison pill means shutdown) - break - common_id = list(set(start_posorient.index).intersection( - self._posorient_vel.index)) - self._posorient_vel.loc[common_id] = start_posorient.loc[common_id] - self.move() - next_posorient = self._posorient_vel - self._posorients_queue.task_done() - self._results_queue.put((start_posorient, next_posorient)) - self._posorients_queue.task_done() - print('Process {} done'.format(proc_name)) - - -class GraphAgent(): - """ - A GraphAgent uses, to build a graph, - -1. pre-rendered scene from a database to derive \ -the agent motion, or -2. pre-computed agent-motion - - - """ - - def __init__(self, brain, mode_of_motion): - self._brain = copy.copy(brain) - # Init the graph - self._graph = nx.DiGraph() - if not isinstance(self._brain.renderer, DataBase): - msg = 'GraphAgent only works with a brain having ' - msg += 'a renderer of type DataBase' - raise TypeError(msg) - for row_id, posor in self._brain.posorients.iterrows(): - posor.name = row_id - self._graph.add_node(row_id, - posorient=posor) - self.mode_of_motion = mode_of_motion - # Create a dataframe to store the velocities - convention = self._brain.renderer.rotation_convention - tuples_posvel = posorient_columns(convention) - tuples_posvel.extend(velocities_columns(convention)) - index_posvel = pd.MultiIndex.from_tuples(tuples_posvel, - names=['position', - 'orientation']) - self.velocities = pd.DataFrame(columns=index_posvel, - index=list(self._graph.nodes())) - - @property - def graph(self): - return self._graph - - @graph.setter - def graph(self, graph): - if isinstance(graph, nx.DiGraph) is False: - raise TypeError('graph is not a nx.DiGraph') - self._graph = graph.copy() - self.check_graph() - - def compute_velocities(self, - ncpu=5, - timeout=1, - filename=None, - blocksize=100): - if os.path.exists(filename): - self.velocities = pd.read_hdf(filename) - nodes_tocompute = self.velocities.isna().any(axis=1) - nodes_tocompute = nodes_tocompute[nodes_tocompute].index - # Build a list of nodes - posorients_queue = JoinableQueue() - results_queue = Queue() - for node in nodes_tocompute: - posorients_queue.put(self._graph.nodes[node]['posorient']) - - # Start ndatabase loader - convention = self._brain.renderer.rotation_convention - num_agents = ncpu - agents = [CyberBeeAgent(copy.copy(self._brain), - convention=convention, - posorients_queue=posorients_queue, - results_queue=results_queue) - for _ in range(num_agents)] - for w in agents: - w.start() - - # Add a poison pill for each agent - for _ in range(num_agents): - posorients_queue.put(None) - - # Wait for all of the tasks to finish - # posorients_queue.join() - nline = 0 - prev_nline = nline - t_start = time.time() - nbnodes = nodes_tocompute.shape[0] - for _ in range(nbnodes): - res = results_queue.get(timeout=timeout) - self.velocities.loc[res.name, res.index] = res - if (nline - prev_nline) > blocksize: - t_elapse = time.time() - t_start - t_peritem = t_elapse / nline - remain = nbnodes - nline - print('Computed {} in {}'.format(nline, t_elapse)) - print('Remain {}, done in {}'.format( - remain, remain * t_peritem)) - if filename is not None: - self.velocities.to_hdf(filename, key='velocities') - prev_nline = nline - nline += 1 - return self.velocities.copy() - - def build_graph(self, movemode, moveparam): - """ - Connect edges with a given velocity - """ - if self.velocities.dropna().shape[0] == 0: - raise NameError('compute_velocities should be called first') - edges = pd.Series(data=np.nan, index=self.velocities.index) - # Make sure that the velocity start at the correct location - posorients = self._brain.posorients - myvelocities = self.velocities.copy() - myvelocities = myvelocities.swaplevel(axis=1) - myvelocities.x = posorients.x - myvelocities.y = posorients.y - myvelocities.z = posorients.z - myvelocities.alpha_0 = posorients.alpha_0 - myvelocities.alpha_1 = posorients.alpha_1 - myvelocities.alpha_2 = posorients.alpha_2 - myvelocities = myvelocities.swaplevel(axis=1) - for ii, row in myvelocities.iterrows(): - if np.any(np.isnan(row)): - continue - # Move according to user mode of motion - nposorient = navimomath.next_pos(row, movemode, moveparam) - # Snap to the closest point - nextpos_index = navimomath.closest_pos( - nposorient, myvelocities) - edges[ii] = nextpos_index.name - # Format for graph - validedges = edges.dropna() - results_edges = np.vstack( - [validedges.index, - validedges.values]).transpose() - # Add to graph - self._graph.add_edges_from(results_edges) - self.check_graph() - - def check_graph(self): - self.check_single_target() - - def check_single_target(self): - if version < 2: - graph_nodes = list(self._graph.nodes()) - else: - graph_nodes = list(self._graph.nodes) - for node in graph_nodes: - # not connected -> loop not ran - for count, _ in enumerate(self._graph.neighbors(node)): - # count == 0 -> connected to one node - # count == 1 -> connected to two nodes - if count > 0: - raise ValueError( - 'Node {} leads to several locations'.format(node)) - - def find_attractors(self): - """Return a list of node going to each attractor in a graph - """ - attractors = list() - for attractor in nx.attracting_components(self._graph): - att = dict() - att['attractor'] = attractor - attractors.append(att) - return attractors - - def find_attractors_sources(self, attractors=None): - """Find all sources going to each attractors - """ - if attractors is None: - attractors = self.find_attractors() - - if isinstance(attractors, list) is False: - raise TypeError('Attractors should be a list of dict') - elif len(attractors) == 0: - raise ValueError('No attractors found') - - # Check attractor - for att in attractors: - keyatt = att.keys() - if 'attractor' not in keyatt: - raise ValueError( - 'Each attractors should contain the key attractor') - - # Calculate connection - for att_i, att in enumerate(attractors): - - # [0] because one node of the attractor is enough - # all other node of the attractor are connected to this one - target = list(att['attractor'])[0] - attractors[att_i]['paths'] = nx.shortest_path( - self.graph, target=target) - attractors[att_i]['sources'] = list( - attractors[att_i]['paths'].keys()) - return attractors - - def catchment_area(self, attractors=None): - """Return the catchment area for attractors - """ - if attractors is None: - attractors = self.find_attractors_sources() - - if isinstance(attractors, list) is False: - raise TypeError('Attractors should be a list of dict') - elif len(attractors) == 0: - raise ValueError('No attractors found') - - # Check attractor - for att in attractors: - keyatt = att.keys() - if 'sources' not in keyatt: - raise ValueError( - 'Each attractors should contains a list of sources') - - return [len(att['sources']) for att in attractors] - - def reach_goals(self, goals): - """ Return all paths to the goals """ - return nx.shortest_path(self._graph, target=goals) - - def neighboring_nodes(self, target): - """ Return the nodes going to the target """ - # Reverse graph because nx.neighbors give the end node - # and we want to find the start node going to target - # not where target goes. - tmpgraph = self._graph.reverse(copy=True) - neighbors = tmpgraph.neighbors(target) - return neighbors diff --git a/navipy/moving/maths.py b/navipy/moving/maths.py deleted file mode 100644 index 3a3c3a9c324f44072a12fd2f7d08bc6ee650f7af..0000000000000000000000000000000000000000 --- a/navipy/moving/maths.py +++ /dev/null @@ -1,120 +0,0 @@ -""" -Mathematical computation are done in this module. Those involve mostly -geometry, and predefined grids shapes -""" -import numpy as np -import pandas as pd - - -def mode_moves_supported(): - return { - 'on_cubic_grid': { - 'param': - ['grid_spacing'], - 'describe': - "Agent restricted to move on a grid"}, - 'free_run': { - 'param': [], - 'describe': - "Freely moving agent, pos(t+dt)=pos+speed (dt=1)"}} - - -def next_pos(motion_vec, move_mode, move_param=None): - """return the future position knowing speed and current position - - :param motion_vec: the position and speed of the agent -(pandas Series with columns ['x','y','z','dx','dy','dz']) - :param grid_spacing: the spacing between two grid points -(only relevant for regular grids) - :param grid_mode: the type of grid. - - ..todo: add literal include for supported_grid_mode - """ - if isinstance(motion_vec, pd.Series) is False: - raise TypeError('motion vector must be a pandas Series') - if move_mode not in mode_moves_supported().keys(): - raise KeyError( - 'move mode must is not supported {}'.format(move_mode)) - tuples = [('location', 'dx'), ('location', 'dy'), - ('location', 'dz')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', - 'orientation']) - speed = pd.Series(index=index) - speed.loc[('location', 'dx')] = motion_vec[('location', 'dx')] - speed.loc[('location', 'dy')] = motion_vec[('location', 'dy')] - speed.loc[('location', 'dz')] = motion_vec[('location', 'dz')] - # speed = motion_vec.loc[['dx', 'dy', 'dz']] - if move_mode == 'on_cubic_grid': - # speed in spherical coord - epsilon = np.arctan2(speed['location']['dz'], - np.sqrt(speed['location']['dx']**2 + - speed['location']['dy']**2)) - phi = np.arctan2(speed['location']['dy'], speed['location']['dx']) - radius = np.sqrt(np.sum(speed**2)) - if np.isclose(radius, 0): - # scaling = 0 - speed = 0 * speed - else: - tuples = [('location', 'dx'), ('location', 'dy'), - ('location', 'dz')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', - 'orientation']) - deltas = pd.Series(index=index) - deltas['location']['dz'] = float(epsilon > (np.pi / 8) - - epsilon < (np.pi / 8)) - edgecases = np.linspace(-np.pi, np.pi, 9) - case_i = np.argmin(np.abs(phi - edgecases)) - if case_i == 8 or case_i == 0 or case_i == 1 or case_i == 7: - deltas['location']['dx'] = -1 - elif case_i == 3 or case_i == 4 or case_i == 5: - deltas['location']['dx'] = 1 - else: - deltas['location']['dx'] = 0 - - if case_i == 1 or case_i == 2 or case_i == 3: - deltas['location']['dy'] = -1 - elif case_i == 5 or case_i == 6 or case_i == 7: - deltas['location']['dy'] = 1 - - else: - deltas['location']['dy'] = 0 - # scaling = 1 - speed = move_param['grid_spacing'] * deltas - elif move_mode is 'free_run': - pass - # scaling = 1 # <=> dt = 1, user need to scale speed in dt units - else: - raise ValueError('grid_mode is not supported') - toreturn = motion_vec.copy() - toreturn.loc[('location', 'x')] += speed['location']['dx'] - toreturn.loc[('location', 'y')] += speed['location']['dy'] - toreturn.loc[('location', 'z')] += speed['location']['dz'] - return toreturn - - -def closest_pos(pos, positions): - """Return the closest position from a list of positions - - :param pos: the position to find (a pandas Series with ['x','y','z'] - :param positions: the possible closest positions - (a pandas dataframe with - [['location','x'],['location','y'],['location','z']]) - """ - - euclidian_dist = np.sqrt( - (pos['location']['x'] - positions['location']['x'])**2 - + (pos['location']['y'] - positions['location']['y'])**2 - + (pos['location']['z'] - positions['location']['z'])**2) - return positions.loc[euclidian_dist.idxmin()] - - -def closest_pos_memory_friendly(pos, database): - """Return the closest position from a list of positions - - :param pos: the position to find (a pandas Series with ['x','y','z'] - :param database: the possible closest positions - (a pandas dataframe with ['x','y','z']) - """ - raise NameError('Not implemated') diff --git a/navipy/moving/test_agent.py b/navipy/moving/test_agent.py deleted file mode 100644 index e8ab767cb2e90877693ea9f534212e29d50ac64d..0000000000000000000000000000000000000000 --- a/navipy/moving/test_agent.py +++ /dev/null @@ -1,331 +0,0 @@ -""" -Test of agent -""" -import numpy as np -import pandas as pd -import networkx as nx -from navipy.moving import agent as naviagent -from navipy import database as navidb -from navipy import Brain -import pkg_resources -import warnings - -import unittest - -version = float(nx.__version__) - - -class BrainTest(Brain): - def __init__(self, renderer=None): - Brain.__init__(self, renderer=renderer) - convention = 'zyx' - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), (convention, 'alpha_0'), - (convention, 'alpha_1'), (convention, 'alpha_2')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', - 'orientation']) - self.__posorient_col = index - tuples_vel = [('location', 'x'), ('location', 'y'), - ('location', 'z'), (convention, 'alpha_0'), - (convention, 'alpha_1'), (convention, 'alpha_2'), - ('location', 'dx'), ('location', 'dy'), - ('location', 'dz'), (convention, 'dalpha_0'), - (convention, 'dalpha_1'), (convention, 'dalpha_2')] - index_vel = pd.MultiIndex.from_tuples(tuples_vel, - names=['position', - 'orientation']) - self.__velocity_col = index_vel - self.__posorient_vel_col = self.__velocity_col.copy() - # self.__posorient_vel_col.extend(self.__velocity_col) - - def velocity(self): - return pd.Series(data=0, index=self.__posorient_vel_col) - - -class TestNavipyMovingAgent(unittest.TestCase): - def setUp(self): - self.mydb_filename = pkg_resources.resource_filename( - 'navipy', 'resources/database.db') - self.mydb = navidb.DataBase(self.mydb_filename, mode='r') - self.convention = 'zyx' - self.brain = BrainTest(self.mydb) - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), (self.convention, 'alpha_0'), - (self.convention, 'alpha_1'), (self.convention, 'alpha_2')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', - 'orientation']) - self.__posorient_col = index - tuples_vel = [('location', 'x'), ('location', 'y'), - ('location', 'z'), (self.convention, 'alpha_0'), - (self.convention, 'alpha_1'), - (self.convention, 'alpha_2'), - ('location', 'dx'), ('location', 'dy'), - ('location', 'dz'), (self.convention, 'dalpha_0'), - (self.convention, 'dalpha_1'), - (self.convention, 'dalpha_2')] - index_vel = pd.MultiIndex.from_tuples(tuples_vel, - names=['position', - 'orientation']) - self.__velocity_col = index_vel - self.__posorient_vel_col = self.__posorient_col - # self.__posorient_vel_col.extend(self.__velocity_col) - # - # AbstractAgent - # - - def test_move_abstractagent(self): - agent = naviagent.AbstractAgent() - with self.assertRaises(NameError): - agent.move() - - def test_fly_abstractagent(self): - agent = naviagent.AbstractAgent() - with self.assertRaises(NameError): - agent.fly(max_nstep=10) - - # - # GridAgent - # - def test_move_gridagent(self): - agent = naviagent.GridAgent(self.brain) - initposorient = None - with warnings.catch_warnings(record=True): - initposorient = self.brain.posorients.loc[13, :] - initposovel = pd.Series(data=0, - index=self.__posorient_vel_col) - initposovel.loc[initposorient.index] = initposorient - agent.posorient = initposovel - with self.assertRaises(AttributeError): - agent.move() - tuples = [('location', 'dx'), ('location', 'dy'), - ('location', 'dz')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', - 'orientation']) - mode_move = {'mode': 'on_cubic_grid', - 'param': {'grid_spacing': - pd.Series(data=1, - index=index)}} - agent.mode_of_motion = mode_move - with warnings.catch_warnings(record=True): - agent.move() - obtained = agent.posorient - self.assertTrue(np.allclose( - obtained, initposorient.loc[obtained.index])) - - def test_fly_gridagent(self): - agent = naviagent.GridAgent(self.brain, self.convention) - initposorient = None - with warnings.catch_warnings(record=True): - initposorient = self.brain.posorients.loc[13, :] - initposovel = pd.Series(data=0, - index=self.__posorient_vel_col) - initposovel.loc[initposorient.index] = initposorient - agent.posorient = initposovel - with self.assertRaises(AttributeError): - agent.fly(max_nstep=10) - tuples = [('location', 'dx'), ('location', 'dy'), - ('location', 'dz')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', - 'orientation']) - mode_move = {'mode': 'on_cubic_grid', - 'param': {'grid_spacing': - pd.Series(data=1, - index=index)}} - agent.mode_of_motion = mode_move - agent.fly(max_nstep=10) - obtained = agent.posorient - self.assertTrue(np.allclose(obtained, - initposorient.loc[obtained.index])) - - # - # GraphAgent - # - - def test_init_graphagent(self): - mode_of_motion = dict() - mode_of_motion['mode'] = 'on_cubic_grid' - mode_of_motion['param'] = dict() - mode_of_motion['param']['grid_spacing'] = 0.5 - agent = None - with warnings.catch_warnings(record=True): - agent = naviagent.GraphAgent(self.brain, mode_of_motion) - if version < 2: - graph_nodes = list(agent.graph.nodes()) - else: - graph_nodes = list(agent.graph.nodes) - self.assertEqual(sorted(graph_nodes), - sorted(list(self.mydb.posorients.index)), - 'Init of graph failed. Node missmatch') - - def test_graph_setter(self): - mode_of_motion = dict() - mode_of_motion['mode'] = 'on_cubic_grid' - mode_of_motion['param'] = dict() - mode_of_motion['param']['grid_spacing'] = 0.5 - agent = None - with warnings.catch_warnings(record=True): - agent = naviagent.GraphAgent(self.brain, mode_of_motion) - if version < 2: - graph_nodes = list(agent.graph.nodes()) - else: - graph_nodes = list(agent.graph.nodes) - graph_edges = list() - for gnode in graph_nodes[1:]: - graph_edges.append((gnode, graph_nodes[0])) - - graph = nx.DiGraph() - graph.add_nodes_from(graph_nodes) - graph.add_edges_from(graph_edges) - agent.graph = graph - - graph_edges.append((graph_nodes[2], graph_nodes[1])) - graph = nx.DiGraph() - graph.add_nodes_from(graph_nodes) - graph.add_edges_from(graph_edges) - with self.assertRaises(ValueError): - agent.graph = graph - - def test_catchment_area(self): - """ - 1 Test all node to first - 2 Test 11 nodes to first, 14 to 12th - 3 Two loops attractors - """ - # Test all node to first - mode_of_motion = dict() - mode_of_motion['mode'] = 'on_cubic_grid' - mode_of_motion['param'] = dict() - mode_of_motion['param']['grid_spacing'] = 0.5 - agent = None - with warnings.catch_warnings(record=True): - agent = naviagent.GraphAgent(self.brain, mode_of_motion) - - if version < 2: - graph_nodes = list(agent.graph.nodes()) - else: - graph_nodes = list(agent.graph.nodes) - graph_edges = list() - for gnode in graph_nodes[1:]: - graph_edges.append((gnode, graph_nodes[0])) - - graph = nx.DiGraph() - graph.add_nodes_from(graph_nodes) - graph.add_edges_from(graph_edges) - agent.graph = graph - attractors = agent.find_attractors() - self.assertEqual(len(attractors), 1, 'Too many or too few attractors') - attractors = agent.find_attractors_sources(attractors) - catchment_area = agent.catchment_area(attractors) - self.assertEqual(catchment_area, [len(graph_nodes)], - 'Too big or too short catchment area') - - # Test 11 nodes to first, 14 to 12th - graph_edges = list() - for gnode in graph_nodes[1:11]: - graph_edges.append((gnode, graph_nodes[0])) - for gnode in graph_nodes[11:]: - graph_edges.append((gnode, graph_nodes[11])) - - graph = nx.DiGraph() - graph.add_nodes_from(graph_nodes) - graph.add_edges_from(graph_edges) - agent.graph = graph - attractors = agent.find_attractors() - self.assertEqual(len(attractors), 2, 'Too many or too few attractors') - attractors = agent.find_attractors_sources(attractors) - catchment_area = agent.catchment_area(attractors) - self.assertEqual(sorted(catchment_area), [11, 14], - 'Too big or too short catchment area') - - # Two loops attractors - graph_edges = list() - for snode, enode in zip(graph_nodes[:11], - np.roll(graph_nodes[:11], 1)): - graph_edges.append((snode, enode)) - for snode, enode in zip(graph_nodes[11:], - np.roll(graph_nodes[11:], 1)): - graph_edges.append((snode, enode)) - - graph = nx.DiGraph() - graph.add_nodes_from(graph_nodes) - graph.add_edges_from(graph_edges) - agent.graph = graph - attractors = agent.find_attractors() - self.assertEqual(len(attractors), 2, 'Too many or too few attractors') - attractors = agent.find_attractors_sources(attractors) - catchment_area = agent.catchment_area(attractors) - self.assertEqual(sorted(catchment_area), [11, 14], - 'Too big or too short catchment area') - - def test_neighboring_nodes(self): - """ Counting neighnoring nodes for 3 situations - 1. Local maxima - 2. Saddle points - 3. Local minima - """ - # Init the agent - mode_of_motion = dict() - mode_of_motion['mode'] = 'on_cubic_grid' - mode_of_motion['param'] = dict() - mode_of_motion['param']['grid_spacing'] = 0.5 - agent = None - with warnings.catch_warnings(record=True): - agent = naviagent.GraphAgent(self.brain, mode_of_motion) - - # Local maxima - if version < 2: - graph_nodes = list(agent.graph.nodes()) - else: - graph_nodes = list(agent.graph.nodes) - graph_edges = list() - graph_edges.append((graph_nodes[0], - graph_nodes[1])) - - graph = nx.DiGraph() - graph.add_nodes_from(graph_nodes) - graph.add_edges_from(graph_edges) - agent.graph = graph - neighbors = agent.neighboring_nodes(graph_nodes[0]) - expected_nbh = [] - obtained_nbh = [a for a in neighbors] - self.assertEqual(sorted(expected_nbh), - sorted(obtained_nbh), - 'Problem neighbors maxima') - - # Saddle points - graph_edges.append((graph_nodes[1], - graph_nodes[2])) - - graph = nx.DiGraph() - graph.add_nodes_from(graph_nodes) - graph.add_edges_from(graph_edges) - agent.graph = graph - neighbors = agent.neighboring_nodes(graph_nodes[1]) - expected_nbh = [graph_nodes[0]] - obtained_nbh = [a for a in neighbors] - self.assertEqual(sorted(expected_nbh), - sorted(obtained_nbh), - 'Problem neighbors saddle') - - # Local maxima points - graph_edges.append((graph_nodes[3], - graph_nodes[2])) - - graph = nx.DiGraph() - graph.add_nodes_from(graph_nodes) - graph.add_edges_from(graph_edges) - agent.graph = graph - neighbors = agent.neighboring_nodes(graph_nodes[2]) - expected_nbh = [graph_nodes[3], graph_nodes[1]] - obtained_nbh = [a for a in neighbors] - self.assertEqual(sorted(expected_nbh), - sorted(obtained_nbh), - 'Problem neighbors minima') - - -if __name__ == '__main__': - unittest.main() diff --git a/navipy/moving/test_maths.py b/navipy/moving/test_maths.py deleted file mode 100644 index cae640b593f05ea3b1867d4f9133a35bd54cbec4..0000000000000000000000000000000000000000 --- a/navipy/moving/test_maths.py +++ /dev/null @@ -1,140 +0,0 @@ -""" -Test of maths -""" -import numpy as np -import pandas as pd -from navipy.moving import maths as navimaths -import unittest - - -class TestNavipyMovingMaths(unittest.TestCase): - """ A class to test some mathy function of the toolbox - """ - - def test_motion_vec_pandas(self): - """ Test that a TypeError is correctly raised - """ - motion_vec = 'NotPandas' - move_mode = 'on_cubic_grid' - mode_param = dict() - mode_param['grid_spacing'] = 1 - with self.assertRaises(TypeError): - navimaths.next_pos(motion_vec, - move_mode, - move_mode) - - def test_notsupported_mofm(self): - """ Test that a TypeError is correctly raised - """ - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), ('location', 'dx'), - ('location', 'dy'), ('location', 'dz')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', - 'orientation']) - motion_vec = pd.Series(data=0, - index=index) - move_mode = 'NotSupportedMode' - mode_param = dict() - mode_param['grid_spacing'] = 1 - with self.assertRaises(KeyError): - navimaths.next_pos(motion_vec, - move_mode, - move_mode) - - def test_null_velocity(self): - """ Test null velocity - - When the agent has a null velocity, the next postion - should be equal to the current position. Here, we - test this by series equality. - """ - # Test if stay at same position. - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z'), ('location', 'dx'), - ('location', 'dy'), ('location', 'dz')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', - 'orientation']) - motion_vec = pd.Series(data=0, - index=index) - move_mode = 'on_cubic_grid' - mode_param = dict() - mode_param['grid_spacing'] = 1 - new_pos = navimaths.next_pos(motion_vec, move_mode, mode_param) - self.assertTrue(new_pos.equals(motion_vec), - 'At null velocity the agent should not move') - - def test_closest_cubic(self): - """ Test if the snaping to cubic is correct - - When the agent move on the grid, its position has to be snapped - to the grid position. On a cubic grid, the instability is at - 22.5 degrees modulo 45 degrees. We therefore test the functions - close to each instability. - """ - tuples = [('location', 'x'), ('location', 'y'), - ('location', 'z')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', - 'orientation']) - positions = pd.DataFrame(data=[[0, 0, 0], - [0, 1, 0], - [0, 2, 0], - [1, 0, 0], - [1, 1, 0], - [1, 2, 0], - [2, 0, 0], - [2, 1, 0], - [2, 2, 0]], - columns=index, - dtype=np.float) - move_mode = 'on_cubic_grid' - move_param = dict() - tuples = [('location', 'dx'), ('location', 'dy'), - ('location', 'dz')] - index = pd.MultiIndex.from_tuples(tuples, - names=['position', 'orientation']) - move_param['grid_spacing'] = pd.Series(data=1, - index=index) - expected_dict = dict() - expected_dict[-22] = 7 # [2,1] - expected_dict[22] = 7 - expected_dict[24] = 8 # [2,2] - expected_dict[67] = 8 - expected_dict[68] = 5 # [1,2] - expected_dict[112] = 5 - expected_dict[113] = 2 # [0,2] - expected_dict[157] = 2 - expected_dict[158] = 1 # [0, 1] - expected_dict[202] = 1 - expected_dict[204] = 0 # [0, 0] - expected_dict[247] = 0 - expected_dict[248] = 3 # [1, 0] - expected_dict[292] = 3 - expected_dict[293] = 6 # [2, 0] - expected_dict[337] = 6 - expected_dict[338] = 7 # equivalent to -22 - tuples2 = [('location', 'x'), ('location', 'y'), - ('location', 'z'), ('location', 'dx'), - ('location', 'dy'), ('location', 'dz')] - index2 = pd.MultiIndex.from_tuples(tuples2, - names=['position', 'orientation']) - - for angle, exp_i in expected_dict.items(): - alpha = np.deg2rad(angle) - motion_vec = pd.Series( - data=[1, 1, 0, - np.cos(alpha), np.sin(alpha), 0], - index=index2, - dtype=np.float) - newpos = navimaths.next_pos(motion_vec, - move_mode, - move_param) - snappos = navimaths.closest_pos(newpos, positions) - self.assertEqual(snappos.name, exp_i, - 'closest pos is not correctly snapped') - - -if __name__ == '__main__': - unittest.main() diff --git a/navipy/processing/test_opticflow.py b/navipy/processing/test_opticflow.py index feb9630dadf8e77074d2b3fc10e300d7057c9cfa..989253d6c0a2df1b544b89452a88c437bca4693e 100644 --- a/navipy/processing/test_opticflow.py +++ b/navipy/processing/test_opticflow.py @@ -2,8 +2,8 @@ import unittest from navipy.processing import mcode import pandas as pd import numpy as np -from navipy.moving.agent import posorient_columns -from navipy.moving.agent import velocities_columns +from navipy.trajectories import posorient_columns +from navipy.trajectories import velocities_columns from navipy.scene import __spherical_indeces__ diff --git a/navipy/trajectories/__init__.py b/navipy/trajectories/__init__.py index 1f2856db9bb50e336d39739df671a43d2623f6c6..8051f6583fa29dc93bf16c66517a939eb173798d 100644 --- a/navipy/trajectories/__init__.py +++ b/navipy/trajectories/__init__.py @@ -20,9 +20,30 @@ from scipy.interpolate import CubicSpline import warnings +def posorient_columns(convention): + toreturn = [('location', 'x'), + ('location', 'y'), + ('location', 'z')] + if convention == 'quaternion': + for a in range(4): + toreturn.append((convention, 'q_{}'.format(a))) + else: + for a in range(3): + toreturn.append((convention, 'alpha_{}'.format(a))) + return toreturn + + +def velocities_columns(convention): + toreturn = [] + # Prepend d on dimention for derivative + for it1, it2 in posorient_columns(convention): + toreturn.append((it1, 'd'+it2)) + return toreturn + + def _markers2position(x, kwargs): - mark0 = pd.Series(x[:3], index=['x', 'y', 'z']) - mark1 = pd.Series(x[3:6], index=['x', 'y', 'z']) + mark0 = pd.Series(x[: 3], index=['x', 'y', 'z']) + mark1 = pd.Series(x[3: 6], index=['x', 'y', 'z']) mark2 = pd.Series(x[6:], index=['x', 'y', 'z']) triangle_mode = kwargs['triangle_mode'] euler_axes = kwargs['euler_axes']