From 02f5d668d0c2213086c30a777ad6b7ea22434b2a Mon Sep 17 00:00:00 2001
From: Ramin Yaghoubzadeh <ryaghoub@techfak.uni-bielefeld.de>
Date: Mon, 13 Feb 2012 16:22:46 +0100
Subject: [PATCH] Fixed a ***load of bugs, thanks to testing.

---
 python/src/ipaaca.py      | 91 ++++++++++++++++++++++++++-------------
 python/src/testipaaca.py  | 31 -------------
 python/test/testipaaca.py | 70 ++++++++++++++++++++++++++++++
 3 files changed, 131 insertions(+), 61 deletions(-)
 delete mode 100755 python/src/testipaaca.py
 create mode 100755 python/test/testipaaca.py

diff --git a/python/src/ipaaca.py b/python/src/ipaaca.py
index 38f0d5a..9f639cc 100755
--- a/python/src/ipaaca.py
+++ b/python/src/ipaaca.py
@@ -25,12 +25,15 @@ import ipaaca_pb2
 #  for internal links etc. The IU may be published later through
 #  the same buffer that allocated the UID.
 
+# WARNINGS
+#  category is now the FIRST argument for IU constructors
+
 __all__ = [
 	'IUEventType',
 	'IUAccessMode',
 	'InputBuffer', 'OutputBuffer',
 	'IU',
-	'IUPublishedError', 'IUUpdateFailedError', 'IUCommittedError', 'IUReadOnlyError',
+	'IUPublishedError', 'IUUpdateFailedError', 'IUCommittedError', 'IUReadOnlyError', 'IUNotFoundError',
 	'logger'
 ]
 
@@ -117,30 +120,13 @@ class IUReadOnlyError(Exception):
 	def __init__(self, iu):
 		super(IUReadOnlyError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it is read-only.')
 
+class IUNotFoundError(Exception):
+	"""Error indicating that an IU UID was unexpectedly not found in an internal store."""
+	def __init__(self, iu_uid):
+		super(IUNotFoundError, self).__init__('Lookup of IU ' + str(iu_uid) + ' failed.')
 
 ## --- Generation Architecture -----------------------------------------------
 
-#class Links(object):
-#	''' This is essentially a dict STR -> set([STR, ...]) '''
-#	def __init__(self, iu, writer_name=None, new_links=None):
-#		nl = {} if new_links is None else new_links
-#		self.iu = iu
-#		self.iu._set_links(links=self, is_delta=False, new_links=pl, links_to_remove=[], writer_name=writer_name)
-#		for k, v in pl.items():
-#			dict.__setitem__(self, k, v)
-#	def add_links(self, type, targets, writer_name=None):
-#		if not hasattr(targets, '__iter__'): targets=[targets]
-#		self.iu._set_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name)
-#	def remove_links(self, type, targets, writer_name=None):
-#		if not hasattr(targets, '__iter__'): targets=[targets]
-#		self.iu._set_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name)
-#	def modify_links(self, add, remove, writer_name=None):
-#		self.iu._set_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name)
-#	def set_links(self, links, writer_name=None):
-#		self.iu._set_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name)
-#	#def get_links(self, type):
-#	#	return set(self.iu._get_links())
-
 class Payload(dict):
 	def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False):
 		pl = {} if new_payload is None else new_payload
@@ -304,7 +290,7 @@ class IU(IUInterface):#{{{
 
 	"""A local IU."""
 
-	def __init__(self, access_mode=IUAccessMode.PUSH, read_only=False, category='undef', _payload_type='MAP'):
+	def __init__(self, category='undef', access_mode=IUAccessMode.PUSH, read_only=False, _payload_type='MAP'):
 		super(IU, self).__init__(uid=None, access_mode=access_mode, read_only=read_only)
 		self._revision = 1
 		self._category = category
@@ -409,7 +395,7 @@ class RemotePushIU(IUInterface):#{{{
 	
 	"""A remote IU with access mode 'PUSH'."""
 	
-	def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload):
+	def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links):
 		super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only)
 		self._revision = revision
 		self._category = category
@@ -420,6 +406,7 @@ class RemotePushIU(IUInterface):#{{{
 		#  don't try to invoke any modification checks or network updates ourselves either.
 		#  We are just receiving it here and applying the new data.
 		self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True)
+		self._links = links
 	
 	def _modify_links(self, links, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
 		"""Modify the links: add or remove item from this payload remotely and send update."""
@@ -570,6 +557,10 @@ class IUConverter(rsb.converter.Converter):#{{{
 		for k,v in iu._payload.items():
 			entry = pbo.payload.add()
 			pack_typed_payload_item(entry, k, v)
+		for type_ in iu._links.keys():
+			linkset = pbo.links.add()
+			linkset.type = type_
+			linkset.targets.extend(iu._links[type_])
 		return bytearray(pbo.SerializeToString()), self.wireSchema
 	
 	def deserialize(self, byte_stream, ws):
@@ -582,6 +573,10 @@ class IUConverter(rsb.converter.Converter):#{{{
 				for entry in pbo.payload:
 					k, v = unpack_typed_payload_item(entry)
 					_payload[k] = v
+				_links = collections.defaultdict(set)
+				for linkset in pbo.links:
+					for target_uid in linkset.targets:
+						_links[linkset.type].add(target_uid)
 				remote_push_iu = RemotePushIU(
 						uid=pbo.uid,
 						revision=pbo.revision,
@@ -590,7 +585,8 @@ class IUConverter(rsb.converter.Converter):#{{{
 						category = pbo.category,
 						payload_type = pbo.payload_type,
 						committed = pbo.committed,
-						payload=_payload
+						payload=_payload,
+						links=_links
 					)
 				return remote_push_iu
 			else:
@@ -716,6 +712,15 @@ class IUStore(dict):
 	def __init__(self):
 		super(IUStore, self).__init__()
 
+class FrozenIUStore(IUStore):
+	"""A read-only version of a dictionary storing IUs. (TODO: might be slow)"""
+	def __init__(self, original_iu_store):
+		super(FrozenIUStore, self).__init__()
+		map(lambda p: super(FrozenIUStore, self).__setitem__(p[0], p[1]), original_iu_store.items())
+	def __delitem__(self, k):
+		raise AttributeError()
+	def __setitem__(self, k, v):
+		raise AttributeError()
 
 class IUEventHandler(object):
 	
@@ -787,6 +792,10 @@ class Buffer(object):
 		self._iu_store = IUStore()
 		self._iu_event_handlers = []
 	
+	def _get_frozen_iu_store(self):
+		return FrozenIUStore(original_iu_store = self._iu_store)
+	iu_store = property(fget=_get_frozen_iu_store, doc='Copy-on-read version of the internal IU store')
+	
 	def register_handler(self, handler_function, for_event_types=None, for_categories=None):
 		"""Register a new IU event handler function.
 		
@@ -804,7 +813,6 @@ class Buffer(object):
 	def call_iu_event_handlers(self, uid, local, event_type, category):
 		"""Call registered IU event handler functions registered for this event_type and category."""
 		for h in self._iu_event_handlers:
-			# print('calling an update handler for '+event_type+' -> '+str(h))
 			h.call(self, uid, local=local, event_type=event_type, category=category)
 	
 	def _get_owning_component_name(self):
@@ -816,6 +824,7 @@ class Buffer(object):
 		"""Return the Buffer's unique name."""
 		return self._unique_name
 	unique_name = property(_get_unique_name)
+	
 
 
 class InputBuffer(Buffer):
@@ -855,7 +864,7 @@ class InputBuffer(Buffer):
 		cat_listener.addHandler(self._handle_iu_events)
 		self._listener_store[iu_category] = cat_listener
 		self._category_interests.append(iu_category)
-		logger.info("Added category listener for "+iu_category)
+		logger.info("Added listener in scope "+"/ipaaca/category/"+iu_category)
 		return cat_listener
 	
 	def _handle_iu_events(self, event):
@@ -1007,13 +1016,14 @@ class OutputBuffer(Buffer):
 	def _get_informer(self, iu_category):
 		'''Return (or create, store and return) an informer object for IUs of the specified category.'''
 		if iu_category in self._informer_store:
+			logger.info("Returning informer on scope "+"/ipaaca/category/"+str(iu_category))
 			return self._informer_store[iu_category]
 		informer_iu = rsb.createInformer(
 				rsb.Scope("/ipaaca/category/"+str(iu_category)),
 				config=self._participant_config,
 				dataType=object)
 		self._informer_store[iu_category] = informer_iu #new_tuple
-		logger.info("Added informer on "+iu_category)
+		logger.info("Returning NEW informer on scope "+"/ipaaca/category/"+str(iu_category))
 		return informer_iu #return new_tuple
 	
 	def add(self, iu):
@@ -1025,11 +1035,33 @@ class OutputBuffer(Buffer):
 		iu.buffer = self
 		self._publish_iu(iu)
 	
+	def remove(self, iu=None, iu_uid=None):
+		'''Remove the iu or an IU corresponding to iu_uid from the OutputBuffer, retracting it from the system.'''
+		if iu is None:
+			if iu_uid is None:
+				return None
+			else:
+				if iu_uid not in self. _iu_store:
+					raise IUNotFoundError(iu_uid)
+				iu = self._iu_store[iu_uid]
+		# unpublish the IU
+		self._retract_iu(iu)
+		del self._iu_store[iu.uid]
+		return iu
+	
 	def _publish_iu(self, iu):
 		'''Publish an IU.'''
 		informer = self._get_informer(iu._category)
 		informer.publishData(iu)
 	
+	def _retract_iu(self, iu):
+		'''Retract (unpublish) an IU.'''
+		iu_retraction = ipaaca_pb2.IURetraction()
+		iu_retraction.uid = iu.uid
+		iu_retraction.revision = iu.revision
+		informer = self._get_informer(iu._category)
+		informer.publishData(iu_retraction)
+	
 	def _send_iu_commission(self, iu, writer_name):
 		'''Send IU commission.
 		
@@ -1045,7 +1077,6 @@ class OutputBuffer(Buffer):
 		iu_commission.uid = iu.uid
 		iu_commission.revision = iu.revision
 		iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name
-		# print('sending IU commission event')
 		informer = self._get_informer(iu._category)
 		informer.publishData(iu_commission)
 	
@@ -1135,4 +1166,4 @@ initialize_ipaaca_rsb()
 
 # Create a global logger for this module
 logger = logging.getLogger('ipaaca')
-logger.addHandler(IpaacaLoggingHandler())
+logger.addHandler(IpaacaLoggingHandler(level=logging.INFO))
diff --git a/python/src/testipaaca.py b/python/src/testipaaca.py
deleted file mode 100755
index 4156281..0000000
--- a/python/src/testipaaca.py
+++ /dev/null
@@ -1,31 +0,0 @@
-#!/usr/bin/env python
-
-import time
-import ipaaca
-import sys
-
-import unittest
-
-class IpaacaLinksTestCase(unittest.TestCase):
-	def setUp(self):
-		self.ib = ipaaca.InputBuffer('TestIn', ['sensorcategory'])
-		self.ob = ipaaca.OutputBuffer('TestOut')
-		self.sensor_iu = ipaaca.IU('sensorcategory')
-		self.sensor_iu.payload = {'data': 'sensordata'}
-		self.ob.add(self.sensor_iu)
-	def tearDown(self):
-		pass
-	def testSetLink(self):
-		time.sleep(0.1)
-		self.decision_iu = ipaaca.IU('decisioncategory')
-		self.decision_iu.payload = {'data':'decision'}
-		self.decision_iu.set_links( { 'grin': [self.sensor_iu.uid] } )
-		self.ob.add(self.decision_iu)
-		time.sleep(0.1)
-		grinlinks = self.decision_iu.get_links('grin')
-		self.assertIn(self.sensor_iu.uid, grinlinks)
-		self.assertEqual(len(grinlinks), 1)
-
-if __name__ == '__main__':
-	unittest.main()
-
diff --git a/python/test/testipaaca.py b/python/test/testipaaca.py
new file mode 100755
index 0000000..9aa9313
--- /dev/null
+++ b/python/test/testipaaca.py
@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+
+import time
+import ipaaca
+import sys
+
+import unittest
+	
+def handle_iu_event(iu, event_type, local):
+	print('(IU event '+event_type+' '+str(iu.uid)+')')
+
+class IpaacaIUStoreTestCase(unittest.TestCase):
+	def setUp(self):
+		self.ib = ipaaca.InputBuffer('TestIn', ['sensorcategory'])
+		self.ib.register_handler(handle_iu_event)
+		self.ob = ipaaca.OutputBuffer('TestOut')
+		self.sensor_iu = ipaaca.IU('sensorcategory')
+		self.sensor_iu.payload = {'data': 'sensordata'}
+		time.sleep(0.1)
+		self.ob.add(self.sensor_iu)
+		time.sleep(0.1)
+	def tearDown(self):
+		pass
+	def testInputBufferContents(self):
+		self.assertIn(self.sensor_iu.uid, self.ib.iu_store)
+		self.assertEqual(len(self.ib.iu_store), 1)
+	def testOutputBufferContents(self):
+		self.assertIn(self.sensor_iu.uid, self.ob.iu_store)
+		self.assertEqual(len(self.ob.iu_store), 1)
+
+class IpaacaLinksTestCase(unittest.TestCase):
+	def setUp(self):
+		self.ib = ipaaca.InputBuffer('TestIn', ['sensorcategory', 'decisioncategory'])
+		self.ob = ipaaca.OutputBuffer('TestOut')
+		self.sensor_iu = ipaaca.IU('sensorcategory')
+		self.sensor_iu.payload = {'data': 'sensordata'}
+		self.ob.add(self.sensor_iu)
+	def tearDown(self):
+		pass
+	def testSetSingleLink(self):
+		time.sleep(0.1)
+		self.decision_iu = ipaaca.IU('decisioncategory')
+		self.decision_iu.payload = {'data':'decision'}
+		self.decision_iu.set_links( { 'grin': [self.sensor_iu.uid] } )
+		self.ob.add(self.decision_iu)
+		time.sleep(0.1)
+		# test received version
+		self.assertIn(self.decision_iu.uid, self.ib.iu_store)
+		received_iu = self.ib.iu_store[self.decision_iu.uid]
+		grinlinks = received_iu.get_links('grin')
+		self.assertIn(self.sensor_iu.uid, grinlinks)
+		self.assertEqual(len(grinlinks), 1)
+	def testSetAndRemoveSingleLink(self):
+		time.sleep(0.1)
+		self.decision_iu = ipaaca.IU('decisioncategory')
+		self.decision_iu.payload = {'data':'decision'}
+		self.decision_iu.set_links( { 'grin': [self.sensor_iu.uid] } )
+		self.ob.add(self.decision_iu)
+		time.sleep(0.1)
+		self.decision_iu.remove_links('grin', [self.sensor_iu.uid])
+		time.sleep(0.1)
+		# test received version
+		self.assertIn(self.decision_iu.uid, self.ib.iu_store)
+		received_iu = self.ib.iu_store[self.decision_iu.uid]
+		grinlinks = received_iu.get_links('grin')
+		self.assertEqual(len(grinlinks), 0)
+
+if __name__ == '__main__':
+	unittest.main()
+
-- 
GitLab