Skip to content
Snippets Groups Projects
Commit 02f5d668 authored by Ramin Yaghoubzadeh's avatar Ramin Yaghoubzadeh
Browse files

Fixed a ***load of bugs, thanks to testing.

parent 4e0ac1d3
No related branches found
No related tags found
No related merge requests found
......@@ -25,12 +25,15 @@ import ipaaca_pb2
# for internal links etc. The IU may be published later through
# the same buffer that allocated the UID.
# WARNINGS
# category is now the FIRST argument for IU constructors
__all__ = [
'IUEventType',
'IUAccessMode',
'InputBuffer', 'OutputBuffer',
'IU',
'IUPublishedError', 'IUUpdateFailedError', 'IUCommittedError', 'IUReadOnlyError',
'IUPublishedError', 'IUUpdateFailedError', 'IUCommittedError', 'IUReadOnlyError', 'IUNotFoundError',
'logger'
]
......@@ -117,30 +120,13 @@ class IUReadOnlyError(Exception):
def __init__(self, iu):
super(IUReadOnlyError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it is read-only.')
class IUNotFoundError(Exception):
"""Error indicating that an IU UID was unexpectedly not found in an internal store."""
def __init__(self, iu_uid):
super(IUNotFoundError, self).__init__('Lookup of IU ' + str(iu_uid) + ' failed.')
## --- Generation Architecture -----------------------------------------------
#class Links(object):
# ''' This is essentially a dict STR -> set([STR, ...]) '''
# def __init__(self, iu, writer_name=None, new_links=None):
# nl = {} if new_links is None else new_links
# self.iu = iu
# self.iu._set_links(links=self, is_delta=False, new_links=pl, links_to_remove=[], writer_name=writer_name)
# for k, v in pl.items():
# dict.__setitem__(self, k, v)
# def add_links(self, type, targets, writer_name=None):
# if not hasattr(targets, '__iter__'): targets=[targets]
# self.iu._set_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name)
# def remove_links(self, type, targets, writer_name=None):
# if not hasattr(targets, '__iter__'): targets=[targets]
# self.iu._set_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name)
# def modify_links(self, add, remove, writer_name=None):
# self.iu._set_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name)
# def set_links(self, links, writer_name=None):
# self.iu._set_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name)
# #def get_links(self, type):
# # return set(self.iu._get_links())
class Payload(dict):
def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False):
pl = {} if new_payload is None else new_payload
......@@ -304,7 +290,7 @@ class IU(IUInterface):#{{{
"""A local IU."""
def __init__(self, access_mode=IUAccessMode.PUSH, read_only=False, category='undef', _payload_type='MAP'):
def __init__(self, category='undef', access_mode=IUAccessMode.PUSH, read_only=False, _payload_type='MAP'):
super(IU, self).__init__(uid=None, access_mode=access_mode, read_only=read_only)
self._revision = 1
self._category = category
......@@ -409,7 +395,7 @@ class RemotePushIU(IUInterface):#{{{
"""A remote IU with access mode 'PUSH'."""
def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload):
def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links):
super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only)
self._revision = revision
self._category = category
......@@ -420,6 +406,7 @@ class RemotePushIU(IUInterface):#{{{
# don't try to invoke any modification checks or network updates ourselves either.
# We are just receiving it here and applying the new data.
self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True)
self._links = links
def _modify_links(self, links, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
"""Modify the links: add or remove item from this payload remotely and send update."""
......@@ -570,6 +557,10 @@ class IUConverter(rsb.converter.Converter):#{{{
for k,v in iu._payload.items():
entry = pbo.payload.add()
pack_typed_payload_item(entry, k, v)
for type_ in iu._links.keys():
linkset = pbo.links.add()
linkset.type = type_
linkset.targets.extend(iu._links[type_])
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
......@@ -582,6 +573,10 @@ class IUConverter(rsb.converter.Converter):#{{{
for entry in pbo.payload:
k, v = unpack_typed_payload_item(entry)
_payload[k] = v
_links = collections.defaultdict(set)
for linkset in pbo.links:
for target_uid in linkset.targets:
_links[linkset.type].add(target_uid)
remote_push_iu = RemotePushIU(
uid=pbo.uid,
revision=pbo.revision,
......@@ -590,7 +585,8 @@ class IUConverter(rsb.converter.Converter):#{{{
category = pbo.category,
payload_type = pbo.payload_type,
committed = pbo.committed,
payload=_payload
payload=_payload,
links=_links
)
return remote_push_iu
else:
......@@ -716,6 +712,15 @@ class IUStore(dict):
def __init__(self):
super(IUStore, self).__init__()
class FrozenIUStore(IUStore):
"""A read-only version of a dictionary storing IUs. (TODO: might be slow)"""
def __init__(self, original_iu_store):
super(FrozenIUStore, self).__init__()
map(lambda p: super(FrozenIUStore, self).__setitem__(p[0], p[1]), original_iu_store.items())
def __delitem__(self, k):
raise AttributeError()
def __setitem__(self, k, v):
raise AttributeError()
class IUEventHandler(object):
......@@ -787,6 +792,10 @@ class Buffer(object):
self._iu_store = IUStore()
self._iu_event_handlers = []
def _get_frozen_iu_store(self):
return FrozenIUStore(original_iu_store = self._iu_store)
iu_store = property(fget=_get_frozen_iu_store, doc='Copy-on-read version of the internal IU store')
def register_handler(self, handler_function, for_event_types=None, for_categories=None):
"""Register a new IU event handler function.
......@@ -804,7 +813,6 @@ class Buffer(object):
def call_iu_event_handlers(self, uid, local, event_type, category):
"""Call registered IU event handler functions registered for this event_type and category."""
for h in self._iu_event_handlers:
# print('calling an update handler for '+event_type+' -> '+str(h))
h.call(self, uid, local=local, event_type=event_type, category=category)
def _get_owning_component_name(self):
......@@ -816,6 +824,7 @@ class Buffer(object):
"""Return the Buffer's unique name."""
return self._unique_name
unique_name = property(_get_unique_name)
class InputBuffer(Buffer):
......@@ -855,7 +864,7 @@ class InputBuffer(Buffer):
cat_listener.addHandler(self._handle_iu_events)
self._listener_store[iu_category] = cat_listener
self._category_interests.append(iu_category)
logger.info("Added category listener for "+iu_category)
logger.info("Added listener in scope "+"/ipaaca/category/"+iu_category)
return cat_listener
def _handle_iu_events(self, event):
......@@ -1007,13 +1016,14 @@ class OutputBuffer(Buffer):
def _get_informer(self, iu_category):
'''Return (or create, store and return) an informer object for IUs of the specified category.'''
if iu_category in self._informer_store:
logger.info("Returning informer on scope "+"/ipaaca/category/"+str(iu_category))
return self._informer_store[iu_category]
informer_iu = rsb.createInformer(
rsb.Scope("/ipaaca/category/"+str(iu_category)),
config=self._participant_config,
dataType=object)
self._informer_store[iu_category] = informer_iu #new_tuple
logger.info("Added informer on "+iu_category)
logger.info("Returning NEW informer on scope "+"/ipaaca/category/"+str(iu_category))
return informer_iu #return new_tuple
def add(self, iu):
......@@ -1025,11 +1035,33 @@ class OutputBuffer(Buffer):
iu.buffer = self
self._publish_iu(iu)
def remove(self, iu=None, iu_uid=None):
'''Remove the iu or an IU corresponding to iu_uid from the OutputBuffer, retracting it from the system.'''
if iu is None:
if iu_uid is None:
return None
else:
if iu_uid not in self. _iu_store:
raise IUNotFoundError(iu_uid)
iu = self._iu_store[iu_uid]
# unpublish the IU
self._retract_iu(iu)
del self._iu_store[iu.uid]
return iu
def _publish_iu(self, iu):
'''Publish an IU.'''
informer = self._get_informer(iu._category)
informer.publishData(iu)
def _retract_iu(self, iu):
'''Retract (unpublish) an IU.'''
iu_retraction = ipaaca_pb2.IURetraction()
iu_retraction.uid = iu.uid
iu_retraction.revision = iu.revision
informer = self._get_informer(iu._category)
informer.publishData(iu_retraction)
def _send_iu_commission(self, iu, writer_name):
'''Send IU commission.
......@@ -1045,7 +1077,6 @@ class OutputBuffer(Buffer):
iu_commission.uid = iu.uid
iu_commission.revision = iu.revision
iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name
# print('sending IU commission event')
informer = self._get_informer(iu._category)
informer.publishData(iu_commission)
......@@ -1135,4 +1166,4 @@ initialize_ipaaca_rsb()
# Create a global logger for this module
logger = logging.getLogger('ipaaca')
logger.addHandler(IpaacaLoggingHandler())
logger.addHandler(IpaacaLoggingHandler(level=logging.INFO))
......@@ -5,26 +5,65 @@ import ipaaca
import sys
import unittest
def handle_iu_event(iu, event_type, local):
print('(IU event '+event_type+' '+str(iu.uid)+')')
class IpaacaLinksTestCase(unittest.TestCase):
class IpaacaIUStoreTestCase(unittest.TestCase):
def setUp(self):
self.ib = ipaaca.InputBuffer('TestIn', ['sensorcategory'])
self.ib.register_handler(handle_iu_event)
self.ob = ipaaca.OutputBuffer('TestOut')
self.sensor_iu = ipaaca.IU('sensorcategory')
self.sensor_iu.payload = {'data': 'sensordata'}
time.sleep(0.1)
self.ob.add(self.sensor_iu)
time.sleep(0.1)
def tearDown(self):
pass
def testSetLink(self):
def testInputBufferContents(self):
self.assertIn(self.sensor_iu.uid, self.ib.iu_store)
self.assertEqual(len(self.ib.iu_store), 1)
def testOutputBufferContents(self):
self.assertIn(self.sensor_iu.uid, self.ob.iu_store)
self.assertEqual(len(self.ob.iu_store), 1)
class IpaacaLinksTestCase(unittest.TestCase):
def setUp(self):
self.ib = ipaaca.InputBuffer('TestIn', ['sensorcategory', 'decisioncategory'])
self.ob = ipaaca.OutputBuffer('TestOut')
self.sensor_iu = ipaaca.IU('sensorcategory')
self.sensor_iu.payload = {'data': 'sensordata'}
self.ob.add(self.sensor_iu)
def tearDown(self):
pass
def testSetSingleLink(self):
time.sleep(0.1)
self.decision_iu = ipaaca.IU('decisioncategory')
self.decision_iu.payload = {'data':'decision'}
self.decision_iu.set_links( { 'grin': [self.sensor_iu.uid] } )
self.ob.add(self.decision_iu)
time.sleep(0.1)
grinlinks = self.decision_iu.get_links('grin')
# test received version
self.assertIn(self.decision_iu.uid, self.ib.iu_store)
received_iu = self.ib.iu_store[self.decision_iu.uid]
grinlinks = received_iu.get_links('grin')
self.assertIn(self.sensor_iu.uid, grinlinks)
self.assertEqual(len(grinlinks), 1)
def testSetAndRemoveSingleLink(self):
time.sleep(0.1)
self.decision_iu = ipaaca.IU('decisioncategory')
self.decision_iu.payload = {'data':'decision'}
self.decision_iu.set_links( { 'grin': [self.sensor_iu.uid] } )
self.ob.add(self.decision_iu)
time.sleep(0.1)
self.decision_iu.remove_links('grin', [self.sensor_iu.uid])
time.sleep(0.1)
# test received version
self.assertIn(self.decision_iu.uid, self.ib.iu_store)
received_iu = self.ib.iu_store[self.decision_iu.uid]
grinlinks = received_iu.get_links('grin')
self.assertEqual(len(grinlinks), 0)
if __name__ == '__main__':
unittest.main()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment