Skip to content
Snippets Groups Projects
Commit 3b8a0ac3 authored by Olivier Bertrand's avatar Olivier Bertrand
Browse files

Remove moving part, it will be replace by simulation on velocity vector field

parent 1b1ad877
No related branches found
No related tags found
No related merge requests found
"""
Close-loop agent
~~~~~~~~~~~~~~~~
A standard method to move an agent is to update:
1. update the sensory information at the current agent location :math:`x`
2. deduce the agent motion :math:`vdt` from this information
3. displace the agent by motion ( :math:`x\\rightarrow x + vdt`)
The use of a close loop model including visual rendering is \
sometimes too slow to efficiently test several models or tune the \
parameters of a given models. The GridAgent solves this problem by \
restricting the agent motion on locations with rendered scene. The \
agent moves thus on a grid, and its next position is always \
snapped to the closest grid location. The finer the grid is, the \
larger the database storing all sceneries and grid location is; \
but also the more accurate the agent motion is. The grid size \
depend on the storage space, the time you can wait for the \
database creation, and how sensitive to exact location your model is.
This iterative method can be used with a wide range of models. \
In :navipy: differents classes of agents exist. \
They differ by the method use to update the sensory information:
+----------------+----------------------------------------------------+
|Agent class |Sensory update |
+================+====================================================+
|:CyberBeeAgent: |:Cyberbee: update within blender. |
+----------------+----------------------------------------------------+
|:GridAgent: |:DataBase: update from a pre-rendered database. |
+----------------+----------------------------------------------------+
To deduce the agent motion from the current state of the agent \
(e.g. position, orientation, sensory information, memories, ...) all \
classes of agents use callback function, which is a custom function \
defined by the user. This function takes as input argument, the \
agent position-orientation and its velocities, and the currently \
seen scene. The function should return a the agent motion. \
Once the agent sensory method and motion method have been configured, \
the agent can:
1. move (perform a one step motion), or
2. fly (move until its velocity is null, or until n-steps).
Agent on a graph
~~~~~~~~~~~~~~~~
As mentioned above, in every model of navigation the agent motion \
is derived from its current external state, its position \
orientation as well as the derivatives, and its internal state. \
However, when the agent motion is only derived from its current \
position orientation, and what is seen from this location, the \
simulation of an agent can be drastically simplified. Indeed, \
not only the scene at relevant location can be pre-rendered, but \
the motion of the agent from those locations as well.
The agent being restricted to move from relevant locations to \
relevant locations, a graph of interconnected locations can be built.\
The nodes of the graph are the relevant locations, and the directed \
edges the motion of the agent from one location to the next. \
:GraphAgent: can build such graph by simply using a database of \
pre-rendered scenery at relevant locations, and a function \
giving the motion of the agent from a scene and the agent \
position orientation. Once the graph has been generated, \
attractors can be found, the number of locations converging to \
those (i.e. the catchment area or volume), if two locations are \
connected, etc.
To speed up certain calculations, additional values can stored \
at each graph node and access from the callback function. It is \
worth mentioning a warning here. The size of the graph can be \
incredibly large. Thus, not too much information can be stored \
at each node. To assess the memory size of the graph before \
creating it, one can use the tool agent.tools.assess_graphmemsize.
"""
This diff is collapsed.
"""
Mathematical computation are done in this module. Those involve mostly
geometry, and predefined grids shapes
"""
import numpy as np
import pandas as pd
def mode_moves_supported():
return {
'on_cubic_grid': {
'param':
['grid_spacing'],
'describe':
"Agent restricted to move on a grid"},
'free_run': {
'param': [],
'describe':
"Freely moving agent, pos(t+dt)=pos+speed (dt=1)"}}
def next_pos(motion_vec, move_mode, move_param=None):
"""return the future position knowing speed and current position
:param motion_vec: the position and speed of the agent
(pandas Series with columns ['x','y','z','dx','dy','dz'])
:param grid_spacing: the spacing between two grid points
(only relevant for regular grids)
:param grid_mode: the type of grid.
..todo: add literal include for supported_grid_mode
"""
if isinstance(motion_vec, pd.Series) is False:
raise TypeError('motion vector must be a pandas Series')
if move_mode not in mode_moves_supported().keys():
raise KeyError(
'move mode must is not supported {}'.format(move_mode))
tuples = [('location', 'dx'), ('location', 'dy'),
('location', 'dz')]
index = pd.MultiIndex.from_tuples(tuples,
names=['position',
'orientation'])
speed = pd.Series(index=index)
speed.loc[('location', 'dx')] = motion_vec[('location', 'dx')]
speed.loc[('location', 'dy')] = motion_vec[('location', 'dy')]
speed.loc[('location', 'dz')] = motion_vec[('location', 'dz')]
# speed = motion_vec.loc[['dx', 'dy', 'dz']]
if move_mode == 'on_cubic_grid':
# speed in spherical coord
epsilon = np.arctan2(speed['location']['dz'],
np.sqrt(speed['location']['dx']**2 +
speed['location']['dy']**2))
phi = np.arctan2(speed['location']['dy'], speed['location']['dx'])
radius = np.sqrt(np.sum(speed**2))
if np.isclose(radius, 0):
# scaling = 0
speed = 0 * speed
else:
tuples = [('location', 'dx'), ('location', 'dy'),
('location', 'dz')]
index = pd.MultiIndex.from_tuples(tuples,
names=['position',
'orientation'])
deltas = pd.Series(index=index)
deltas['location']['dz'] = float(epsilon > (np.pi / 8) -
epsilon < (np.pi / 8))
edgecases = np.linspace(-np.pi, np.pi, 9)
case_i = np.argmin(np.abs(phi - edgecases))
if case_i == 8 or case_i == 0 or case_i == 1 or case_i == 7:
deltas['location']['dx'] = -1
elif case_i == 3 or case_i == 4 or case_i == 5:
deltas['location']['dx'] = 1
else:
deltas['location']['dx'] = 0
if case_i == 1 or case_i == 2 or case_i == 3:
deltas['location']['dy'] = -1
elif case_i == 5 or case_i == 6 or case_i == 7:
deltas['location']['dy'] = 1
else:
deltas['location']['dy'] = 0
# scaling = 1
speed = move_param['grid_spacing'] * deltas
elif move_mode is 'free_run':
pass
# scaling = 1 # <=> dt = 1, user need to scale speed in dt units
else:
raise ValueError('grid_mode is not supported')
toreturn = motion_vec.copy()
toreturn.loc[('location', 'x')] += speed['location']['dx']
toreturn.loc[('location', 'y')] += speed['location']['dy']
toreturn.loc[('location', 'z')] += speed['location']['dz']
return toreturn
def closest_pos(pos, positions):
"""Return the closest position from a list of positions
:param pos: the position to find (a pandas Series with ['x','y','z']
:param positions: the possible closest positions
(a pandas dataframe with
[['location','x'],['location','y'],['location','z']])
"""
euclidian_dist = np.sqrt(
(pos['location']['x'] - positions['location']['x'])**2
+ (pos['location']['y'] - positions['location']['y'])**2
+ (pos['location']['z'] - positions['location']['z'])**2)
return positions.loc[euclidian_dist.idxmin()]
def closest_pos_memory_friendly(pos, database):
"""Return the closest position from a list of positions
:param pos: the position to find (a pandas Series with ['x','y','z']
:param database: the possible closest positions
(a pandas dataframe with ['x','y','z'])
"""
raise NameError('Not implemated')
"""
Test of agent
"""
import numpy as np
import pandas as pd
import networkx as nx
from navipy.moving import agent as naviagent
from navipy import database as navidb
from navipy import Brain
import pkg_resources
import warnings
import unittest
version = float(nx.__version__)
class BrainTest(Brain):
def __init__(self, renderer=None):
Brain.__init__(self, renderer=renderer)
convention = 'zyx'
tuples = [('location', 'x'), ('location', 'y'),
('location', 'z'), (convention, 'alpha_0'),
(convention, 'alpha_1'), (convention, 'alpha_2')]
index = pd.MultiIndex.from_tuples(tuples,
names=['position',
'orientation'])
self.__posorient_col = index
tuples_vel = [('location', 'x'), ('location', 'y'),
('location', 'z'), (convention, 'alpha_0'),
(convention, 'alpha_1'), (convention, 'alpha_2'),
('location', 'dx'), ('location', 'dy'),
('location', 'dz'), (convention, 'dalpha_0'),
(convention, 'dalpha_1'), (convention, 'dalpha_2')]
index_vel = pd.MultiIndex.from_tuples(tuples_vel,
names=['position',
'orientation'])
self.__velocity_col = index_vel
self.__posorient_vel_col = self.__velocity_col.copy()
# self.__posorient_vel_col.extend(self.__velocity_col)
def velocity(self):
return pd.Series(data=0, index=self.__posorient_vel_col)
class TestNavipyMovingAgent(unittest.TestCase):
def setUp(self):
self.mydb_filename = pkg_resources.resource_filename(
'navipy', 'resources/database.db')
self.mydb = navidb.DataBase(self.mydb_filename, mode='r')
self.convention = 'zyx'
self.brain = BrainTest(self.mydb)
tuples = [('location', 'x'), ('location', 'y'),
('location', 'z'), (self.convention, 'alpha_0'),
(self.convention, 'alpha_1'), (self.convention, 'alpha_2')]
index = pd.MultiIndex.from_tuples(tuples,
names=['position',
'orientation'])
self.__posorient_col = index
tuples_vel = [('location', 'x'), ('location', 'y'),
('location', 'z'), (self.convention, 'alpha_0'),
(self.convention, 'alpha_1'),
(self.convention, 'alpha_2'),
('location', 'dx'), ('location', 'dy'),
('location', 'dz'), (self.convention, 'dalpha_0'),
(self.convention, 'dalpha_1'),
(self.convention, 'dalpha_2')]
index_vel = pd.MultiIndex.from_tuples(tuples_vel,
names=['position',
'orientation'])
self.__velocity_col = index_vel
self.__posorient_vel_col = self.__posorient_col
# self.__posorient_vel_col.extend(self.__velocity_col)
#
# AbstractAgent
#
def test_move_abstractagent(self):
agent = naviagent.AbstractAgent()
with self.assertRaises(NameError):
agent.move()
def test_fly_abstractagent(self):
agent = naviagent.AbstractAgent()
with self.assertRaises(NameError):
agent.fly(max_nstep=10)
#
# GridAgent
#
def test_move_gridagent(self):
agent = naviagent.GridAgent(self.brain)
initposorient = None
with warnings.catch_warnings(record=True):
initposorient = self.brain.posorients.loc[13, :]
initposovel = pd.Series(data=0,
index=self.__posorient_vel_col)
initposovel.loc[initposorient.index] = initposorient
agent.posorient = initposovel
with self.assertRaises(AttributeError):
agent.move()
tuples = [('location', 'dx'), ('location', 'dy'),
('location', 'dz')]
index = pd.MultiIndex.from_tuples(tuples,
names=['position',
'orientation'])
mode_move = {'mode': 'on_cubic_grid',
'param': {'grid_spacing':
pd.Series(data=1,
index=index)}}
agent.mode_of_motion = mode_move
with warnings.catch_warnings(record=True):
agent.move()
obtained = agent.posorient
self.assertTrue(np.allclose(
obtained, initposorient.loc[obtained.index]))
def test_fly_gridagent(self):
agent = naviagent.GridAgent(self.brain, self.convention)
initposorient = None
with warnings.catch_warnings(record=True):
initposorient = self.brain.posorients.loc[13, :]
initposovel = pd.Series(data=0,
index=self.__posorient_vel_col)
initposovel.loc[initposorient.index] = initposorient
agent.posorient = initposovel
with self.assertRaises(AttributeError):
agent.fly(max_nstep=10)
tuples = [('location', 'dx'), ('location', 'dy'),
('location', 'dz')]
index = pd.MultiIndex.from_tuples(tuples,
names=['position',
'orientation'])
mode_move = {'mode': 'on_cubic_grid',
'param': {'grid_spacing':
pd.Series(data=1,
index=index)}}
agent.mode_of_motion = mode_move
agent.fly(max_nstep=10)
obtained = agent.posorient
self.assertTrue(np.allclose(obtained,
initposorient.loc[obtained.index]))
#
# GraphAgent
#
def test_init_graphagent(self):
mode_of_motion = dict()
mode_of_motion['mode'] = 'on_cubic_grid'
mode_of_motion['param'] = dict()
mode_of_motion['param']['grid_spacing'] = 0.5
agent = None
with warnings.catch_warnings(record=True):
agent = naviagent.GraphAgent(self.brain, mode_of_motion)
if version < 2:
graph_nodes = list(agent.graph.nodes())
else:
graph_nodes = list(agent.graph.nodes)
self.assertEqual(sorted(graph_nodes),
sorted(list(self.mydb.posorients.index)),
'Init of graph failed. Node missmatch')
def test_graph_setter(self):
mode_of_motion = dict()
mode_of_motion['mode'] = 'on_cubic_grid'
mode_of_motion['param'] = dict()
mode_of_motion['param']['grid_spacing'] = 0.5
agent = None
with warnings.catch_warnings(record=True):
agent = naviagent.GraphAgent(self.brain, mode_of_motion)
if version < 2:
graph_nodes = list(agent.graph.nodes())
else:
graph_nodes = list(agent.graph.nodes)
graph_edges = list()
for gnode in graph_nodes[1:]:
graph_edges.append((gnode, graph_nodes[0]))
graph = nx.DiGraph()
graph.add_nodes_from(graph_nodes)
graph.add_edges_from(graph_edges)
agent.graph = graph
graph_edges.append((graph_nodes[2], graph_nodes[1]))
graph = nx.DiGraph()
graph.add_nodes_from(graph_nodes)
graph.add_edges_from(graph_edges)
with self.assertRaises(ValueError):
agent.graph = graph
def test_catchment_area(self):
"""
1 Test all node to first
2 Test 11 nodes to first, 14 to 12th
3 Two loops attractors
"""
# Test all node to first
mode_of_motion = dict()
mode_of_motion['mode'] = 'on_cubic_grid'
mode_of_motion['param'] = dict()
mode_of_motion['param']['grid_spacing'] = 0.5
agent = None
with warnings.catch_warnings(record=True):
agent = naviagent.GraphAgent(self.brain, mode_of_motion)
if version < 2:
graph_nodes = list(agent.graph.nodes())
else:
graph_nodes = list(agent.graph.nodes)
graph_edges = list()
for gnode in graph_nodes[1:]:
graph_edges.append((gnode, graph_nodes[0]))
graph = nx.DiGraph()
graph.add_nodes_from(graph_nodes)
graph.add_edges_from(graph_edges)
agent.graph = graph
attractors = agent.find_attractors()
self.assertEqual(len(attractors), 1, 'Too many or too few attractors')
attractors = agent.find_attractors_sources(attractors)
catchment_area = agent.catchment_area(attractors)
self.assertEqual(catchment_area, [len(graph_nodes)],
'Too big or too short catchment area')
# Test 11 nodes to first, 14 to 12th
graph_edges = list()
for gnode in graph_nodes[1:11]:
graph_edges.append((gnode, graph_nodes[0]))
for gnode in graph_nodes[11:]:
graph_edges.append((gnode, graph_nodes[11]))
graph = nx.DiGraph()
graph.add_nodes_from(graph_nodes)
graph.add_edges_from(graph_edges)
agent.graph = graph
attractors = agent.find_attractors()
self.assertEqual(len(attractors), 2, 'Too many or too few attractors')
attractors = agent.find_attractors_sources(attractors)
catchment_area = agent.catchment_area(attractors)
self.assertEqual(sorted(catchment_area), [11, 14],
'Too big or too short catchment area')
# Two loops attractors
graph_edges = list()
for snode, enode in zip(graph_nodes[:11],
np.roll(graph_nodes[:11], 1)):
graph_edges.append((snode, enode))
for snode, enode in zip(graph_nodes[11:],
np.roll(graph_nodes[11:], 1)):
graph_edges.append((snode, enode))
graph = nx.DiGraph()
graph.add_nodes_from(graph_nodes)
graph.add_edges_from(graph_edges)
agent.graph = graph
attractors = agent.find_attractors()
self.assertEqual(len(attractors), 2, 'Too many or too few attractors')
attractors = agent.find_attractors_sources(attractors)
catchment_area = agent.catchment_area(attractors)
self.assertEqual(sorted(catchment_area), [11, 14],
'Too big or too short catchment area')
def test_neighboring_nodes(self):
""" Counting neighnoring nodes for 3 situations
1. Local maxima
2. Saddle points
3. Local minima
"""
# Init the agent
mode_of_motion = dict()
mode_of_motion['mode'] = 'on_cubic_grid'
mode_of_motion['param'] = dict()
mode_of_motion['param']['grid_spacing'] = 0.5
agent = None
with warnings.catch_warnings(record=True):
agent = naviagent.GraphAgent(self.brain, mode_of_motion)
# Local maxima
if version < 2:
graph_nodes = list(agent.graph.nodes())
else:
graph_nodes = list(agent.graph.nodes)
graph_edges = list()
graph_edges.append((graph_nodes[0],
graph_nodes[1]))
graph = nx.DiGraph()
graph.add_nodes_from(graph_nodes)
graph.add_edges_from(graph_edges)
agent.graph = graph
neighbors = agent.neighboring_nodes(graph_nodes[0])
expected_nbh = []
obtained_nbh = [a for a in neighbors]
self.assertEqual(sorted(expected_nbh),
sorted(obtained_nbh),
'Problem neighbors maxima')
# Saddle points
graph_edges.append((graph_nodes[1],
graph_nodes[2]))
graph = nx.DiGraph()
graph.add_nodes_from(graph_nodes)
graph.add_edges_from(graph_edges)
agent.graph = graph
neighbors = agent.neighboring_nodes(graph_nodes[1])
expected_nbh = [graph_nodes[0]]
obtained_nbh = [a for a in neighbors]
self.assertEqual(sorted(expected_nbh),
sorted(obtained_nbh),
'Problem neighbors saddle')
# Local maxima points
graph_edges.append((graph_nodes[3],
graph_nodes[2]))
graph = nx.DiGraph()
graph.add_nodes_from(graph_nodes)
graph.add_edges_from(graph_edges)
agent.graph = graph
neighbors = agent.neighboring_nodes(graph_nodes[2])
expected_nbh = [graph_nodes[3], graph_nodes[1]]
obtained_nbh = [a for a in neighbors]
self.assertEqual(sorted(expected_nbh),
sorted(obtained_nbh),
'Problem neighbors minima')
if __name__ == '__main__':
unittest.main()
"""
Test of maths
"""
import numpy as np
import pandas as pd
from navipy.moving import maths as navimaths
import unittest
class TestNavipyMovingMaths(unittest.TestCase):
""" A class to test some mathy function of the toolbox
"""
def test_motion_vec_pandas(self):
""" Test that a TypeError is correctly raised
"""
motion_vec = 'NotPandas'
move_mode = 'on_cubic_grid'
mode_param = dict()
mode_param['grid_spacing'] = 1
with self.assertRaises(TypeError):
navimaths.next_pos(motion_vec,
move_mode,
move_mode)
def test_notsupported_mofm(self):
""" Test that a TypeError is correctly raised
"""
tuples = [('location', 'x'), ('location', 'y'),
('location', 'z'), ('location', 'dx'),
('location', 'dy'), ('location', 'dz')]
index = pd.MultiIndex.from_tuples(tuples,
names=['position',
'orientation'])
motion_vec = pd.Series(data=0,
index=index)
move_mode = 'NotSupportedMode'
mode_param = dict()
mode_param['grid_spacing'] = 1
with self.assertRaises(KeyError):
navimaths.next_pos(motion_vec,
move_mode,
move_mode)
def test_null_velocity(self):
""" Test null velocity
When the agent has a null velocity, the next postion
should be equal to the current position. Here, we
test this by series equality.
"""
# Test if stay at same position.
tuples = [('location', 'x'), ('location', 'y'),
('location', 'z'), ('location', 'dx'),
('location', 'dy'), ('location', 'dz')]
index = pd.MultiIndex.from_tuples(tuples,
names=['position',
'orientation'])
motion_vec = pd.Series(data=0,
index=index)
move_mode = 'on_cubic_grid'
mode_param = dict()
mode_param['grid_spacing'] = 1
new_pos = navimaths.next_pos(motion_vec, move_mode, mode_param)
self.assertTrue(new_pos.equals(motion_vec),
'At null velocity the agent should not move')
def test_closest_cubic(self):
""" Test if the snaping to cubic is correct
When the agent move on the grid, its position has to be snapped
to the grid position. On a cubic grid, the instability is at
22.5 degrees modulo 45 degrees. We therefore test the functions
close to each instability.
"""
tuples = [('location', 'x'), ('location', 'y'),
('location', 'z')]
index = pd.MultiIndex.from_tuples(tuples,
names=['position',
'orientation'])
positions = pd.DataFrame(data=[[0, 0, 0],
[0, 1, 0],
[0, 2, 0],
[1, 0, 0],
[1, 1, 0],
[1, 2, 0],
[2, 0, 0],
[2, 1, 0],
[2, 2, 0]],
columns=index,
dtype=np.float)
move_mode = 'on_cubic_grid'
move_param = dict()
tuples = [('location', 'dx'), ('location', 'dy'),
('location', 'dz')]
index = pd.MultiIndex.from_tuples(tuples,
names=['position', 'orientation'])
move_param['grid_spacing'] = pd.Series(data=1,
index=index)
expected_dict = dict()
expected_dict[-22] = 7 # [2,1]
expected_dict[22] = 7
expected_dict[24] = 8 # [2,2]
expected_dict[67] = 8
expected_dict[68] = 5 # [1,2]
expected_dict[112] = 5
expected_dict[113] = 2 # [0,2]
expected_dict[157] = 2
expected_dict[158] = 1 # [0, 1]
expected_dict[202] = 1
expected_dict[204] = 0 # [0, 0]
expected_dict[247] = 0
expected_dict[248] = 3 # [1, 0]
expected_dict[292] = 3
expected_dict[293] = 6 # [2, 0]
expected_dict[337] = 6
expected_dict[338] = 7 # equivalent to -22
tuples2 = [('location', 'x'), ('location', 'y'),
('location', 'z'), ('location', 'dx'),
('location', 'dy'), ('location', 'dz')]
index2 = pd.MultiIndex.from_tuples(tuples2,
names=['position', 'orientation'])
for angle, exp_i in expected_dict.items():
alpha = np.deg2rad(angle)
motion_vec = pd.Series(
data=[1, 1, 0,
np.cos(alpha), np.sin(alpha), 0],
index=index2,
dtype=np.float)
newpos = navimaths.next_pos(motion_vec,
move_mode,
move_param)
snappos = navimaths.closest_pos(newpos, positions)
self.assertEqual(snappos.name, exp_i,
'closest pos is not correctly snapped')
if __name__ == '__main__':
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment