From 4ab312eeb1322f569b8fe42e2df61de5f6c210ce Mon Sep 17 00:00:00 2001
From: "Olivier J.N. Bertrand" <bolirev@hotmail.com>
Date: Sat, 9 Dec 2017 18:43:45 +0100
Subject: [PATCH] Add missing files for moving

---
 navipy/moving/__init__.py   |   0
 navipy/moving/test_agent.py | 258 ++++++++++++++++++++++++++++++++++++
 navipy/moving/test_maths.py |  92 +++++++++++++
 3 files changed, 350 insertions(+)
 create mode 100644 navipy/moving/__init__.py
 create mode 100644 navipy/moving/test_agent.py
 create mode 100644 navipy/moving/test_maths.py

diff --git a/navipy/moving/__init__.py b/navipy/moving/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/navipy/moving/test_agent.py b/navipy/moving/test_agent.py
new file mode 100644
index 0000000..d72bf34
--- /dev/null
+++ b/navipy/moving/test_agent.py
@@ -0,0 +1,258 @@
+"""
+Test of maths
+"""
+import numpy as np
+import pandas as pd
+import networkx as nx
+import navipy.moving.agent as naviagent
+import navipy.database as navidb
+import pkg_resources
+
+import unittest
+
+
+class TestNavipyMovingAgent(unittest.TestCase):
+
+    def setUp(self):
+        self.mydb_filename = pkg_resources.resource_filename(
+            'navipy', 'resources/database.db')
+        self.mydb = navidb.DataBaseLoad(self.mydb_filename)
+
+    def test_abstractagent_nodb(self):
+        with self.assertRaises(TypeError):
+            naviagent.AbstractAgent('NotDB')
+
+    def test_memfriendly(self):
+        """posorient is loaded if memory_friendly is False """
+        agent = naviagent.AbstractAgent(self.mydb, memory_friendly=False)
+        self.assertTrue(
+            isinstance(agent.posorients, pd.DataFrame),
+            'posorients should be a pd.DataFrame when memfriendly is false')
+        agent = naviagent.AbstractAgent(self.mydb, memory_friendly=True)
+        self.assertTrue(
+            agent.posorients is None,
+            'posorients should be None when memfriendly is true')
+
+    def test_abstractmove_inputs(self):
+        """abstractmove should TypeError if not pandas Series """
+        agent = naviagent.AbstractAgent(self.mydb, memory_friendly=False)
+        with self.assertRaises(TypeError):
+            agent.abstractmove('NotPandasSeries')
+
+        posorient_vel = pd.Series()
+        for col in ['x', 'y', 'z', 'yaw', 'pitch', 'roll',
+                    'dx', 'dy', 'dz', 'dyaw', 'dpitch', 'droll']:
+            with self.assertRaises(KeyError):
+                agent.abstractmove(posorient_vel)
+            posorient_vel[col] = 2
+
+    def test_abstractmove_null_vellocity(self):
+        """abstractmove should leads to same point with null vel"""
+        agent = naviagent.AbstractAgent(self.mydb, memory_friendly=False)
+
+        posorient_vel = pd.Series(data=0,
+                                  index=['x', 'y', 'z',
+                                         'yaw', 'pitch', 'roll',
+                                         'dx', 'dy', 'dz',
+                                         'dyaw', 'dpitch', 'droll'])
+        pos = self.mydb.read_posorient(rowid=1)
+        posorient_vel.loc[['x', 'y', 'z']] = pos.loc[['x', 'y', 'z']]
+
+        newpos = agent.abstractmove(posorient_vel)
+        self.assertTrue(newpos.equals(posorient_vel),
+                        'Agent moved although velocity is null')
+
+    #
+    # Single
+    #
+    def test_init_inputs(self):
+        initial_condition = 'A'
+        with self.assertRaises(TypeError):
+            naviagent.Single(self.mydb,
+                             initial_condition,
+                             memory_friendly=False)
+
+        initial_condition = pd.Series(index=['x'], data='a')
+        with self.assertRaises(TypeError):
+            naviagent.Single(self.mydb,
+                             initial_condition,
+                             memory_friendly=False)
+
+    def test_velocity_setter(self):
+        posorient_vel = pd.Series(data=np.random.rand(12),
+                                  index=['x', 'y', 'z',
+                                         'yaw', 'pitch', 'roll',
+                                         'dx', 'dy', 'dz',
+                                         'dyaw', 'dpitch', 'droll'])
+        agent = naviagent.Single(self.mydb,
+                                 posorient_vel,
+                                 memory_friendly=False)
+        agent.velocity = posorient_vel
+        self.assertTrue(
+            posorient_vel.loc[['dx', 'dy', 'dz']].equals(
+                agent.velocity),
+            'velocity setter failed')
+
+    def test_angular_velocity(self):
+        posorient_vel = pd.Series(data=np.random.rand(12),
+                                  index=['x', 'y', 'z',
+                                         'yaw', 'pitch', 'roll',
+                                         'dx', 'dy', 'dz',
+                                         'dyaw', 'dpitch', 'droll'])
+        agent = naviagent.Single(self.mydb,
+                                 posorient_vel,
+                                 memory_friendly=False)
+        agent.angular_velocity = posorient_vel
+        self.assertTrue(
+            posorient_vel.loc[['dyaw', 'dpitch', 'droll']].equals(
+                agent.angular_velocity),
+            'angular velocity setter failed')
+
+    #
+    # Multi
+    #
+    def test_init(self):
+        agent = naviagent.Multi(self.mydb)
+        self.assertEqual(sorted(agent.graph.nodes),
+                         sorted(list(self.mydb.get_posorients().index)),
+                         'Init of graph failed. Node missmatch')
+
+    def test_graph_setter(self):
+        agent = naviagent.Multi(self.mydb)
+        graph_nodes = list(agent.graph.nodes)
+        graph_edges = list()
+        for gnode in graph_nodes[1:]:
+            graph_edges.append((gnode, graph_nodes[0]))
+
+        graph = nx.DiGraph()
+        graph.add_nodes_from(graph_nodes)
+        graph.add_edges_from(graph_edges)
+        agent.graph = graph
+
+        graph_edges.append((graph_nodes[2], graph_nodes[1]))
+        graph = nx.DiGraph()
+        graph.add_nodes_from(graph_nodes)
+        graph.add_edges_from(graph_edges)
+        with self.assertRaises(ValueError):
+            agent.graph = graph
+
+    def test_catchment_area(self):
+        """
+        1 Test all node to first
+        2 Test 11 nodes to first, 14 to 12th
+        3 Two loops attractors
+        """
+        # Test all node to first
+        agent = naviagent.Multi(self.mydb)
+
+        graph_nodes = list(agent.graph.nodes)
+        graph_edges = list()
+        for gnode in graph_nodes[1:]:
+            graph_edges.append((gnode, graph_nodes[0]))
+
+        graph = nx.DiGraph()
+        graph.add_nodes_from(graph_nodes)
+        graph.add_edges_from(graph_edges)
+        agent.graph = graph
+        attractors = agent.find_attractors()
+        self.assertEqual(len(attractors), 1, 'Too many or too few attractors')
+        attractors = agent.find_attractors_sources(attractors)
+        catchment_area = agent.catchment_area(attractors)
+        self.assertEqual(catchment_area, [len(graph_nodes)],
+                         'Too big or too short catchment area')
+
+        # Test 11 nodes to first, 14 to 12th
+        graph_edges = list()
+        for gnode in graph_nodes[1:11]:
+            graph_edges.append((gnode, graph_nodes[0]))
+        for gnode in graph_nodes[11:]:
+            graph_edges.append((gnode, graph_nodes[11]))
+
+        graph = nx.DiGraph()
+        graph.add_nodes_from(graph_nodes)
+        graph.add_edges_from(graph_edges)
+        agent.graph = graph
+        attractors = agent.find_attractors()
+        self.assertEqual(len(attractors), 2, 'Too many or too few attractors')
+        attractors = agent.find_attractors_sources(attractors)
+        catchment_area = agent.catchment_area(attractors)
+        self.assertEqual(sorted(catchment_area), [11, 14],
+                         'Too big or too short catchment area')
+
+        # Two loops attractors
+        graph_edges = list()
+        for snode, enode in zip(graph_nodes[:11],
+                                np.roll(graph_nodes[:11], 1)):
+            graph_edges.append((snode, enode))
+        for snode, enode in zip(graph_nodes[11:],
+                                np.roll(graph_nodes[11:], 1)):
+            graph_edges.append((snode, enode))
+
+        graph = nx.DiGraph()
+        graph.add_nodes_from(graph_nodes)
+        graph.add_edges_from(graph_edges)
+        agent.graph = graph
+        attractors = agent.find_attractors()
+        self.assertEqual(len(attractors), 2, 'Too many or too few attractors')
+        attractors = agent.find_attractors_sources(attractors)
+        catchment_area = agent.catchment_area(attractors)
+        self.assertEqual(sorted(catchment_area), [11, 14],
+                         'Too big or too short catchment area')
+
+    def test_neighboring_nodes(self):
+        """ Counting neighnoring nodes for 3 situations
+        1. Local maxima
+        2. Saddle points
+        3. Local minima
+        """
+        agent = naviagent.Multi(self.mydb)
+        # Local maxima
+        graph_nodes = list(agent.graph.nodes)
+        graph_edges = list()
+        graph_edges.append((graph_nodes[0],
+                            graph_nodes[1]))
+
+        graph = nx.DiGraph()
+        graph.add_nodes_from(graph_nodes)
+        graph.add_edges_from(graph_edges)
+        agent.graph = graph
+        neighbors = agent.neighboring_nodes(graph_nodes[0])
+        expected_nbh = []
+        obtained_nbh = [a for a in neighbors]
+        self.assertEqual(sorted(expected_nbh),
+                         sorted(obtained_nbh),
+                         'Problem neighbors maxima')
+
+        # Saddle points
+        graph_edges.append((graph_nodes[1],
+                            graph_nodes[2]))
+
+        graph = nx.DiGraph()
+        graph.add_nodes_from(graph_nodes)
+        graph.add_edges_from(graph_edges)
+        agent.graph = graph
+        neighbors = agent.neighboring_nodes(graph_nodes[1])
+        expected_nbh = [graph_nodes[0]]
+        obtained_nbh = [a for a in neighbors]
+        self.assertEqual(sorted(expected_nbh),
+                         sorted(obtained_nbh),
+                         'Problem neighbors saddle')
+
+        # Local maxima points
+        graph_edges.append((graph_nodes[3],
+                            graph_nodes[2]))
+
+        graph = nx.DiGraph()
+        graph.add_nodes_from(graph_nodes)
+        graph.add_edges_from(graph_edges)
+        agent.graph = graph
+        neighbors = agent.neighboring_nodes(graph_nodes[2])
+        expected_nbh = [graph_nodes[3], graph_nodes[1]]
+        obtained_nbh = [a for a in neighbors]
+        self.assertEqual(sorted(expected_nbh),
+                         sorted(obtained_nbh),
+                         'Problem neighbors minima')
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/navipy/moving/test_maths.py b/navipy/moving/test_maths.py
new file mode 100644
index 0000000..9e57530
--- /dev/null
+++ b/navipy/moving/test_maths.py
@@ -0,0 +1,92 @@
+"""
+Test of maths
+"""
+import numpy as np
+import pandas as pd
+import navipy.moving.maths as navimaths
+
+
+import unittest
+
+
+class TestNavipyMovingMaths(unittest.TestCase):
+
+    def test_motion_vec_pandas(self):
+        motion_vec = 'NotPandas'
+        move_mode = 'on_cubic_grid'
+        mode_param = dict()
+        mode_param['grid_spacing'] = 1
+        with self.assertRaises(TypeError):
+            navimaths.next_pos(motion_vec,
+                               move_mode,
+                               move_mode)
+
+    def test_notsupported_mofm(self):
+        motion_vec = pd.Series(data=0,
+                               index=['x', 'y', 'z', 'dx', 'dy', 'dz'])
+        move_mode = 'NotSupportedMode'
+        mode_param = dict()
+        mode_param['grid_spacing'] = 1
+        with self.assertRaises(KeyError):
+            navimaths.next_pos(motion_vec,
+                               move_mode,
+                               move_mode)
+
+    def test_null_velocity(self):
+        # Test if stay at same position.
+        motion_vec = pd.Series(data=0,
+                               index=['x', 'y', 'z', 'dx', 'dy', 'dz'])
+        move_mode = 'on_cubic_grid'
+        mode_param = dict()
+        mode_param['grid_spacing'] = 1
+        new_pos = navimaths.next_pos(motion_vec, move_mode, mode_param)
+        self.assertTrue(new_pos.equals(motion_vec),
+                        'At null velocity the agent should not move')
+
+    def test_closest_cubic(self):
+        """ Test if the snaping to cubic is correct """
+        positions = pd.DataFrame({'x': [0, 0, 0, 1, 1, 1, 2, 2, 2],
+                                  'y': [0, 1, 2, 0, 1, 2, 0, 1, 2],
+                                  'z': [0, 0, 0, 0, 0, 0, 0, 0, 0]},
+                                 dtype=np.float)
+        move_mode = 'on_cubic_grid'
+        move_param = dict()
+        move_param['grid_spacing'] = 1
+        expected_dict = dict()
+        expected_dict[-22] = 7  # [2,1]
+        expected_dict[22] = 7
+        expected_dict[24] = 8  # [2,2]
+        expected_dict[67] = 8
+        expected_dict[68] = 5  # [1,2]
+        expected_dict[112] = 5
+        expected_dict[113] = 2  # [0,2]
+        expected_dict[157] = 2
+        expected_dict[158] = 1  # [0, 1]
+        expected_dict[202] = 1
+        expected_dict[204] = 0  # [0, 0]
+        expected_dict[247] = 0
+        expected_dict[248] = 3  # [1, 0]
+        expected_dict[292] = 3
+        expected_dict[293] = 6  # [2, 0]
+        expected_dict[337] = 6
+        expected_dict[338] = 7  # equivalent to -22
+
+        for angle, exp_i in expected_dict.items():
+
+            alpha = np.deg2rad(angle)
+            motion_vec = pd.Series(
+                data=[1, 1, 0,
+                      np.cos(alpha), np.sin(alpha), 0],
+                index=['x', 'y', 'z',
+                       'dx', 'dy', 'dz'],
+                dtype=np.float)
+            newpos = navimaths.next_pos(motion_vec,
+                                        move_mode,
+                                        move_param)
+            snappos = navimaths.closest_pos(newpos, positions)
+            self.assertEqual(snappos.name, exp_i,
+                             'closest pos is not correctly snapped')
+
+
+if __name__ == '__main__':
+    unittest.main()
-- 
GitLab