Coverage for navipy/moving/agent.py : 49%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
""" +-------------------------------------------+\ --------------+-------------+ |Agent class |\ Type of agent | Rendering | +===========================================+\ ==============+=============+ |:class:`navipy.moving.agent.CyberBeeAgent` |\ Close loop |Online | +-------------------------------------------+\ +-------------+ |:class:`navipy.moving.agent.GraphAgent` |\ |Pre-rendered | +-------------------------------------------+\ --------------+ + |:class:`navipy.moving.agent.GridAgent` |\ Open loop | | +-------------------------------------------+\ --------------+-------------+
"""
"""default call back""" raise NameError('No Callback')
raise NameError('No Callback')
('location', 'y'), ('location', 'z'), (convention, 'alpha_0'), (convention, 'alpha_1'), (convention, 'alpha_2')]
('location', 'dy'), ('location', 'dz'), (convention, 'dalpha_0'), (convention, 'dalpha_1'), (convention, 'dalpha_2')]
names=['position', 'orientation'])
names=['position', 'orientation']) names=['position', 'orientation']) index=self._posorient_vel_col, data=np.nan)
def posorient(self):
def posorient(self, posorient): raise TypeError('posorient should be a pandas Series') raise KeyError( 'posorient should have {} as index'.format(col)) posorient.loc[self._posorient_col]
def velocity(self): return self._posorient_vel.loc[self._velocity_col].copy()
def velocity(self, velocity): if isinstance(velocity, pd.Series) is False: raise TypeError('velocity should be a pandas Series') for col in self._velocity_col: if col not in velocity.index: raise KeyError( 'velocity should have {} as index'.format(col)) self._posorient_vel.loc[self._velocity_col] = \ velocity.loc[self._velocity_col]
def posorient_vel(self): return self._posorient_vel.copy()
def posorient_vel(self, posorient_vel): self.posorient = posorient_vel self.velocity = posorient_vel
def brain(self): return inspect.getsourcelines(self._brain)
def brain(self, brain):
def alter_posorientvel(self): return inspect.getsourcelines(self._alter_posorientvel)
self.velocity = self._brain.velocity() alteredpos = self._alter_posorientvel(self._posorient_vel) self.posorient = alteredpos self.velocity = alteredpos
"""move cyberbee until max step has been performed """ trajectory = pd.DataFrame(index=range(0, max_nstep), columns=self._posorient_vel_col) trajectory.loc[0, :] = self._posorient_vel.copy() if return_tra: trajectory.loc[stepi, :] = self._posorient_vel.copy() if return_tra: return trajectory else: return None
""" A CyberBeeAgent uses the rendering method of cyberbee. \ CyberBeeAgent is a close loop agent and need to be run within blender \ (see :doc:`rendering`).
Single process Here come example of how to use it
Multi process CyberBeeAgent inherit from the Process \ class of the multiprocessing module of the standard python \ library. Thus, several GridAgents can safely be run in parallel.
"""
posorients_queue=None, results_queue=None): if convention is None: raise Exception("a convention must be specified") if (posorients_queue is not None) and (results_queue is not None): multiprocessing.Process.__init__(self) AbstractAgent.__init__(self, convention) AbstractAgent._alter_posorientvel = \ lambda motion_vec: navimomath.next_pos(motion_vec, move_mode='free_run') self._alter_posorientvel = \ lambda motion_vec: navimomath.next_pos(motion_vec, move_mode='free_run') self.brain = brain self._posorients_queue = posorients_queue self._results_queue = results_queue
""" Only supported when multiprocess""" if self._posorients_queue is None or self._results_queue is None: raise NameError('Single agent class has not be inititialised ' + 'with multiprocessing suppport') proc_name = self.name print('Process {} started'.format(proc_name)) while True: start_posorient = self._posorients_queue.get(timeout=1) if start_posorient is None: # Poison pill means shutdown) break common_id = list(set(start_posorient.index).intersection( self._posorient_vel.index)) self._posorient_vel.loc[common_id] = start_posorient.loc[common_id] self._posorient_vel.name = start_posorient.name self.move() posorient_vel = self._posorient_vel self._posorients_queue.task_done() self._results_queue.put(posorient_vel) self._posorients_queue.task_done() print('Process {} done'.format(proc_name))
""" A GridAgent fetches the scene from a pre-rendered database. \ (see :doc:`database`) GridAgent is a close loop agent here its position is snap to a grid.
Single process Here come example of how to use it
Multi process GridAgent inherit from the Process \ class of the multiprocessing module of the standard python \ library. Thus, several GridAgents can safely be run in parallel.
"""
posorients_queue=None, results_queue=None): msg = 'GridAgent only works with a brain having ' msg += 'a renderer of type DataBase' raise TypeError(msg) multiprocessing.Process.__init__(self)
def mode_of_motion(self): """ """ toreturn = self._mode_move toreturn['describe'] = \ navimomath.mode_moves_supported()[ self._mode_move['mode']]['describe'] return toreturn
def mode_of_motion(self, mode): """
""" raise TypeError('Mode is not a dictionary') raise KeyError("'mode' is not a key of mode") raise KeyError("'param' is not a key of mode") mode['mode']]['param']: raise KeyError( "'{}' is not in mode['param']".format(param)) else: raise ValueError('mode is not supported')
posorient_vel = navimomath.next_pos( posorient_vel, move_mode=self._mode_move['mode'], move_param=self._mode_move['param']) tmppos = self._brain.posorients tmp = navimomath.closest_pos( posorient_vel, tmppos) # self._brain.posorients) posorient_vel.loc[self._posorient_col] = \ tmp.loc[self._posorient_col] posorient_vel.name = tmp.name return posorient_vel
else: 'GridAgent object has no attribute _mode_move\n' + 'Please set the mode of motion')
else: 'GridAgent object has no attribute _mode_move\n' + 'Please set the mode of motion')
""" Only supported when multiprocess""" if self._posorients_queue is None or self._results_queue is None: raise NameError('Single agent class has not be inititialised ' + 'with multiprocessing suppport') proc_name = self.name print('Process {} started'.format(proc_name)) while True: start_posorient = self._posorients_queue.get(timeout=1) if start_posorient is None: # Poison pill means shutdown) break common_id = list(set(start_posorient.index).intersection( self._posorient_vel.index)) self._posorient_vel.loc[common_id] = start_posorient.loc[common_id] self.move() next_posorient = self._posorient_vel self._posorients_queue.task_done() self._results_queue.put((start_posorient, next_posorient)) self._posorients_queue.task_done() print('Process {} done'.format(proc_name))
""" A GraphAgent uses, to build a graph,
1. pre-rendered scene from a database to derive \ the agent motion, or 2. pre-computed agent-motion
"""
# Init the graph msg = 'GraphAgent only works with a brain having ' msg += 'a renderer of type DataBase' raise TypeError(msg) posorient=posor) # Create a dataframe to store the velocities names=['position', 'orientation']) index=list(self._graph.nodes()))
def graph(self):
def graph(self, graph): raise TypeError('graph is not a nx.DiGraph')
ncpu=5, timeout=1, filename=None, blocksize=100): if os.path.exists(filename): self.velocities = pd.read_hdf(filename) nodes_tocompute = self.velocities.isna().any(axis=1) nodes_tocompute = nodes_tocompute[nodes_tocompute].index # Build a list of nodes posorients_queue = JoinableQueue() results_queue = Queue() for node in nodes_tocompute: posorients_queue.put(self._graph.nodes[node]['posorient'])
# Start ndatabase loader convention = self._brain.renderer.rotation_convention num_agents = ncpu agents = [CyberBeeAgent(copy.copy(self._brain), convention=convention, posorients_queue=posorients_queue, results_queue=results_queue) for _ in range(num_agents)] for w in agents: w.start()
# Add a poison pill for each agent for _ in range(num_agents): posorients_queue.put(None)
# Wait for all of the tasks to finish # posorients_queue.join() nline = 0 prev_nline = nline t_start = time.time() nbnodes = nodes_tocompute.shape[0] for _ in range(nbnodes): res = results_queue.get(timeout=timeout) self.velocities.loc[res.name, res.index] = res if (nline - prev_nline) > blocksize: t_elapse = time.time() - t_start t_peritem = t_elapse / nline remain = nbnodes - nline print('Computed {} in {}'.format(nline, t_elapse)) print('Remain {}, done in {}'.format( remain, remain * t_peritem)) if filename is not None: self.velocities.to_hdf(filename, key='velocities') prev_nline = nline nline += 1 return self.velocities.copy()
""" Connect edges with a given velocity """ if self.velocities.dropna().shape[0] == 0: raise NameError('compute_velocities should be called first') edges = pd.Series(data=np.nan, index=self.velocities.index) # Make sure that the velocity start at the correct location posorients = self._brain.posorients myvelocities = self.velocities.copy() myvelocities = myvelocities.swaplevel(axis=1) myvelocities.x = posorients.x myvelocities.y = posorients.y myvelocities.z = posorients.z myvelocities.alpha_0 = posorients.alpha_0 myvelocities.alpha_1 = posorients.alpha_1 myvelocities.alpha_2 = posorients.alpha_2 myvelocities = myvelocities.swaplevel(axis=1) for ii, row in myvelocities.iterrows(): if np.any(np.isnan(row)): continue # Move according to user mode of motion nposorient = navimomath.next_pos(row, movemode, moveparam) # Snap to the closest point nextpos_index = navimomath.closest_pos( nposorient, myvelocities) edges[ii] = nextpos_index.name # Format for graph validedges = edges.dropna() results_edges = np.vstack( [validedges.index, validedges.values]).transpose() # Add to graph self._graph.add_edges_from(results_edges) self.check_graph()
graph_nodes = list(self._graph.nodes()) else: # not connected -> loop not ran # count == 0 -> connected to one node # count == 1 -> connected to two nodes 'Node {} leads to several locations'.format(node))
"""Return a list of node going to each attractor in a graph """
"""Find all sources going to each attractors """ attractors = self.find_attractors()
raise TypeError('Attractors should be a list of dict') raise ValueError('No attractors found')
# Check attractor raise ValueError( 'Each attractors should contain the key attractor')
# Calculate connection
# [0] because one node of the attractor is enough # all other node of the attractor are connected to this one self.graph, target=target) attractors[att_i]['paths'].keys())
"""Return the catchment area for attractors """ attractors = self.find_attractors_sources()
raise TypeError('Attractors should be a list of dict') raise ValueError('No attractors found')
# Check attractor raise ValueError( 'Each attractors should contains a list of sources')
""" Return all paths to the goals """ return nx.shortest_path(self._graph, target=goals)
""" Return the nodes going to the target """ # Reverse graph because nx.neighbors give the end node # and we want to find the start node going to target # not where target goes. |