From 900ddedb28d83b91fca2db23bc5ca68ce6dbc18d Mon Sep 17 00:00:00 2001
From: Ramin Yaghoubzadeh <ryaghoub@techfak.uni-bielefeld.de>
Date: Mon, 23 Jan 2012 14:47:17 +0100
Subject: [PATCH] The old test cases work again -> refactoring of Payload seems
 to be complete. We added links, tests pending.

---
 python/src/informer.py |  30 ++++
 python/src/ipaaca.py   | 313 ++++++++++++++++++++++++++++++++---------
 python/src/listener.py |  39 +++++
 python/src/rsb.cfg     |   5 +
 4 files changed, 320 insertions(+), 67 deletions(-)
 create mode 100755 python/src/informer.py
 create mode 100755 python/src/listener.py
 create mode 100644 python/src/rsb.cfg

diff --git a/python/src/informer.py b/python/src/informer.py
new file mode 100755
index 0000000..36955ee
--- /dev/null
+++ b/python/src/informer.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+
+import time
+import ipaaca
+
+def remote_change_dumper(iu, event_type, local):
+	if local:
+		print 'remote side '+event_type+': '+str(iu)
+
+
+ob = ipaaca.OutputBuffer('CoolComponent')
+ob.register_handler(remote_change_dumper)
+
+
+iu = ipaaca.IU()
+iu.payload = {'a':'a1'}
+ob.add(iu)
+
+iu.payload = {'a':'a2', 'b':'b1'} #OK
+del(iu.payload['b'])
+iu.payload['c'] = 'c1'
+iu.payload['a'] = 'a3'
+
+time.sleep(1)
+iu.commit()
+
+while True:
+	time.sleep(1)
+
+
diff --git a/python/src/ipaaca.py b/python/src/ipaaca.py
index 95e0d1b..ff6bed0 100755
--- a/python/src/ipaaca.py
+++ b/python/src/ipaaca.py
@@ -7,9 +7,10 @@ import logging
 import sys
 import threading
 import uuid
+import collections
 
 import rsb
-import rsb.transport.converter
+import rsb.converter
 
 import ipaaca_pb2
 
@@ -41,7 +42,7 @@ def enum(*sequential, **named):
 def pack_typed_payload_item(protobuf_object, key, value):
 	protobuf_object.key = str(key)
 	protobuf_object.value = str(value)
-	protobuf_object.payload_type = 'str' # TODO: more types
+	protobuf_object.type = 'str' # TODO: more types
 
 
 def unpack_typed_payload_item(protobuf_object):
@@ -68,7 +69,8 @@ IUEventType = enum(
 	COMMITTED = 'COMMITTED',
 	DELETED = 'DELETED',
 	RETRACTED = 'RETRACTED',
-	UPDATED = 'UPDATED'
+	UPDATED = 'UPDATED',
+	LINKSUPDATED = 'LINKSUPDATED'
 )
 
 
@@ -109,22 +111,53 @@ class IUReadOnlyError(Exception):
 ## --- Generation Architecture -----------------------------------------------
 
 
+class Links(object):
+	''' This is essentially a dict STR -> set([STR, ...]) '''
+	def __init__(self, iu, writer_name=None, new_links=None):
+		nl = {} if new_links is None else new_links
+		self.iu = iu
+		self.iu._set_links(links=self, is_delta=False, new_links=pl, links_to_remove=[], writer_name=writer_name)
+		for k, v in pl.items():
+			dict.__setitem__(self, k, v)
+	def add_links(self, type, targets, writer_name=None):
+		if not hasattr(targets, '__iter__'): targets=[targets]
+		self.iu._set_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name)
+	def remove_links(self, type, targets, writer_name=None):
+		if not hasattr(targets, '__iter__'): targets=[targets]
+		self.iu._set_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name)
+	def modify_links(self, add, remove, writer_name=None):
+		self.iu._set_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name)
+	def set_links(self, links, writer_name=None):
+		self.iu._set_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name)
+	def get_links(self, type):
+		return set(self.iu._get_links())
+
 class Payload(dict):
-	def __init__(self, iu, writer_name=None, new_payload=None):
+	def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False):
 		pl = {} if new_payload is None else new_payload
 		self.iu = iu
-		self.iu._set_payload(payload=self, is_delta=False, new_items=pl, keys_to_remove=[], writer_name=writer_name)
+		# NOTE omit_init_update_message is necessary to prevent checking for
+		#   exceptions and sending updates in the case where we just receive
+		#   a whole new payload from the remote side and overwrite it locally.
+		if (not omit_init_update_message) and (self.iu.buffer is not None):
+			self.iu._modify_payload(payload=self, is_delta=False, new_items=pl, keys_to_remove=[], writer_name=writer_name)
 		for k, v in pl.items():
 			dict.__setitem__(self, k, v)
 	def __setitem__(self, k, v, writer_name=None):
-		self.iu._set_payload(payload=self, is_delta=True, new_items={k:v}, keys_to_remove=[], writer_name=writer_name)
+		self.iu._modify_payload(payload=self, is_delta=True, new_items={k:v}, keys_to_remove=[], writer_name=writer_name)
 		result = dict.__setitem__(self, k, v)
 	def __delitem__(self, k, writer_name=None):
-		self.iu._set_payload(payload=self, is_delta=True, new_items={}, keys_to_remove=[k], writer_name=writer_name)
+		self.iu._modify_payload(payload=self, is_delta=True, new_items={}, keys_to_remove=[k], writer_name=writer_name)
 		result = dict.__delitem__(self, k)
+	def _remotely_enforced_setitem(self, k, v):
+		"""Sets an item when requested remotely."""
+		return dict.__setitem__(self, k, v)
+	def _remotely_enforced_delitem(self, k):
+		"""Deletes an item when requested remotely."""
+		return dict.__delitem__(self, k)
 
 
-class IUInterface(object):
+class IUInterface(object): #{{{
 	
 	"""Base class of all specialised IU classes."""
 	
@@ -146,6 +179,29 @@ class IUInterface(object):
 		self._read_only = read_only
 		self._buffer = None
 		# payload is not present here
+		self._links = collections.defaultdict(set)
+	
+	def _add_and_remove_links(self, add, remove):
+		for type in remove.keys(): self._links[type] -= remove[type]
+		for type in add.keys(): self._links[type] |= add[type]
+		
+	def add_links(self, type, targets, writer_name=None):
+		if not hasattr(targets, '__iter__'): targets=[targets]
+		self._modify_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name)
+		self._add_and_remove_links( add={type:targets}, remove={} )
+	def remove_links(self, type, targets, writer_name=None):
+		if not hasattr(targets, '__iter__'): targets=[targets]
+		self._modify_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name)
+		self._add_and_remove_links( add={}, remove={type:targets} )
+	def modify_links(self, add, remove, writer_name=None):
+		self._modify_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name)
+		self._add_and_remove_links( add=add, remove=remove )
+	def set_links(self, links, writer_name=None):
+		self._modify_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name)
+		self._links = {}
+		self._add_and_remove_links( add=new_links, remove={} )
+	def get_links(self, type):
+		return set(self._links[type])
 	
 	def _get_revision(self):
 		return self._revision
@@ -211,11 +267,27 @@ class IU(IUInterface):#{{{
 		self._revision = 1
 		self._category = category
 		self._payload_type = _payload_type
+		self.revision_lock = threading.RLock()
 		self._payload = Payload(iu=self)
-		self.revision_lock = threading.Lock()
 	
-	def _set_payload(payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
-		"""Set an item from this payload locally and send update."""
+	def _modify_links(self, links, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
+		if self.committed:
+			raise IUCommittedError(self)
+		with self.revision_lock:
+			# modify links locally
+			self._increase_revision_number()
+			if self.is_published:
+				# send update to remote holders
+				self.buffer._send_iu_link_update(
+						self,
+						revision=self.revision,
+						is_delta=is_delta,
+						new_links=new_links,
+						links_to_remove=links_to_remove,
+						writer_name=self.owner_name if writer_name is None else writer_name)
+	
+	def _modify_payload(self, payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
+		"""Modify the payload: add or remove items from this payload locally and send update."""
 		if self.committed:
 			raise IUCommittedError(self)
 		with self.revision_lock:
@@ -226,11 +298,10 @@ class IU(IUInterface):#{{{
 				self.buffer._send_iu_payload_update(
 						self,
 						revision=self.revision,
-						is_delta=True,
-						new_items={k:v},
-						keys_to_remove=[],
+						is_delta=is_delta,
+						new_items=new_items,
+						keys_to_remove=keys_to_remove,
 						writer_name=self.owner_name if writer_name is None else writer_name)
-			return result
 	
 	def _increase_revision_number(self):
 		self._revision += 1
@@ -315,9 +386,13 @@ class RemotePushIU(IUInterface):#{{{
 		self.owner_name = owner_name
 		self._payload_type = payload_type
 		self._committed = committed
-		self._payload = Payload(iu=self, new_payload=payload)
+		# NOTE Since the payload is an already-existant Payload which we didn't modify ourselves,
+		#  don't try to invoke any modification checks or network updates ourselves either.
+		#  We are just receiving it here and applying the new data.
+		self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True)
 	
-	def _set_payload(payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
+	def _modify_payload(self, payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
+		"""Modify the payload: add or remove item from this payload remotely and send update."""
 		if self.committed:
 			raise IUCommittedError(self)
 		if self.read_only:
@@ -388,7 +463,8 @@ class RemotePushIU(IUInterface):#{{{
 			raise IUUpdateFailedError(self)
 		else:
 			self._revision = new_revision
-			self._payload = Payload(iu=self, new_payload=new_pl)
+			# NOTE Please read the comment in the constructor
+			self._payload = Payload(iu=self, new_payload=new_pl, omit_init_update_message=True)
 	payload = property(
 			fget=_get_payload,
 			fset=_set_payload,
@@ -401,8 +477,8 @@ class RemotePushIU(IUInterface):#{{{
 			for k in update.keys_to_remove: self.payload._remotely_enforced_delitem(k)
 			for k, v in update.new_items.items(): self.payload._remotely_enforced_setitem(k, v)
 		else:
-			# using '_payload' to circumvent the local writing methods
-			self._payload = Payload(iu=self, new_payload=update.new_items)
+			# NOTE Please read the comment in the constructor
+			self._payload = Payload(iu=self, new_payload=update.new_items, omit_init_update_message=True)
 	
 	def _apply_commission(self):
 		"""Apply commission to the IU"""
@@ -410,7 +486,7 @@ class RemotePushIU(IUInterface):#{{{
 #}}}
 
 
-class IntConverter(rsb.transport.converter.Converter):#{{{
+class IntConverter(rsb.converter.Converter):#{{{
 	"""Convert Python int objects to Protobuf ints and vice versa."""
 	def __init__(self, wireSchema="int", dataType=int):
 		super(IntConverter, self).__init__(bytearray, dataType, wireSchema)
@@ -422,12 +498,12 @@ class IntConverter(rsb.transport.converter.Converter):#{{{
 	
 	def deserialize(self, byte_stream, ws):
 		pbo = ipaaca_pb2.IntMessage()
-		pbo.ParseFromString( byte_stream )
+		pbo.ParseFromString( str(byte_stream) )
 		return pbo.value
 #}}}
 
 
-class IUConverter(rsb.transport.converter.Converter):#{{{
+class IUConverter(rsb.converter.Converter):#{{{
 	'''
 	Converter class for Full IU representations
 	wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
@@ -454,7 +530,7 @@ class IUConverter(rsb.transport.converter.Converter):#{{{
 		type = self.getDataType()
 		if type == IU:
 			pbo = ipaaca_pb2.IU()
-			pbo.ParseFromString( byte_stream )
+			pbo.ParseFromString( str(byte_stream) )
 			if pbo.access_mode ==  ipaaca_pb2.IU.PUSH:
 				_payload = {}
 				for entry in pbo.payload:
@@ -478,6 +554,27 @@ class IUConverter(rsb.transport.converter.Converter):#{{{
 #}}}
 
 
+class IULinkUpdate(object):#{{{
+	
+	def __init__(self, uid, revision, is_delta, writer_name="undef", new_links=None, links_to_remove=None):
+		super(IULinkUpdate, self).__init__()
+		self.uid = uid
+		self.revision = revision
+		self.writer_name = writer_name
+		self.is_delta = is_delta
+		self.new_links = collections.defaultdict(set) if new_links is None else collections.defaultdict(set, new_links)
+		self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove)
+	
+	def __str__(self):
+		s =  'PayloadUpdate(' + 'uid=' + self.uid + ', '
+		s += 'revision='+str(self.revision)+', '
+		s += 'writer_name='+str(self.writer_name)+', '
+		s += 'is_delta='+str(self.is_delta)+', '
+		s += 'new_links = '+str(self.new_links)+', '
+		s += 'links_to_remove = '+str(self.links_to_remove)+')'
+		return s
+#}}}
+
 class IUPayloadUpdate(object):#{{{
 	
 	def __init__(self, uid, revision, is_delta, writer_name="undef", new_items=None, keys_to_remove=None):
@@ -488,13 +585,6 @@ class IUPayloadUpdate(object):#{{{
 		self.is_delta = is_delta
 		self.new_items = {} if new_items is None else new_items
 		self.keys_to_remove = [] if keys_to_remove is None else keys_to_remove
-	# @RAMIN: Does this still need to be fixed? I guess not.
-	# FIXME encode is required to use this class as an
-	# argument for a remote server call
-	
-	#def encode(self, encoding):
-	#	conv = IUPayloadUpdateConverter()
-	#	return conv.serialize(self)
 	
 	def __str__(self):
 		s =  'PayloadUpdate(' + 'uid=' + self.uid + ', '
@@ -506,8 +596,43 @@ class IUPayloadUpdate(object):#{{{
 		return s
 #}}}
 
+class IULinkUpdateConverter(rsb.converter.Converter):#{{{
+	def __init__(self, wireSchema="ipaaca-iu-link-update", dataType=IULinkUpdate):
+		super(IULinkUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
+	
+	def serialize(self, iu_link_update):
+		pbo = ipaaca_pb2.IULinkUpdate()
+		pbo.uid = iu_link_update.uid
+		pbo.writer_name = iu_link_update.writer_name
+		pbo.revision = iu_link_update.revision
+		for type_ in iu_link_update.new_links.keys():
+			linkset = pbo.new_links.add()
+			linkset.type = type_
+			linkset.targets.extend(iu_link_update.new_links[type_])
+		for type_ in iu_link_update.links_to_remove.keys():
+			linkset = pbo.links_to_remove.add()
+			linkset.type = type_
+			linkset.targets.extend(iu_link_update.links_to_remove[type_])
+		pbo.is_delta = iu_link_update.is_delta
+		return bytearray(pbo.SerializeToString()), self.wireSchema
+	
+	def deserialize(self, byte_stream, ws):
+		type = self.getDataType()
+		if type == IULinkUpdate:
+			pbo = ipaaca_pb2.IULinkUpdate()
+			pbo.ParseFromString( str(byte_stream) )
+			logger.debug('received an IULinkUpdate for revision '+str(pbo.revision))
+			iu_link_up = IULinkUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
+			for entry in pbo.new_links:
+				iu_link_up.new_links[str(entry.type)] = set(entry.targets)
+			for entry in pbo.links_to_remove:
+				iu_link_up.links_to_remove[str(entry.type)] = set(entry.targets)
+			return iu_link_up
+		else:
+			raise ValueError("Inacceptable dataType %s" % type)
+#}}}
 
-class IUPayloadUpdateConverter(rsb.transport.converter.Converter):#{{{
+class IUPayloadUpdateConverter(rsb.converter.Converter):#{{{
 	def __init__(self, wireSchema="ipaaca-iu-payload-update", dataType=IUPayloadUpdate):
 		super(IUPayloadUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
 	
@@ -527,7 +652,7 @@ class IUPayloadUpdateConverter(rsb.transport.converter.Converter):#{{{
 		type = self.getDataType()
 		if type == IUPayloadUpdate:
 			pbo = ipaaca_pb2.IUPayloadUpdate()
-			pbo.ParseFromString( byte_stream )
+			pbo.ParseFromString( str(byte_stream) )
 			logger.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
 			iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
 			for entry in pbo.new_items:
@@ -696,7 +821,8 @@ class InputBuffer(Buffer):
 		Keyword arguments:
 		event -- a converted RSB event
 		'''
-		if type(event.data) is RemotePushIU:
+		type_ = type(event.data)
+		if type_ is RemotePushIU:
 			# a new IU
 			if event.data.uid in self._iu_store:
 				# already in our store
@@ -714,17 +840,24 @@ class InputBuffer(Buffer):
 				# TODO: we should request the IU's owner to send us the IU
 				logger.warning("Update message for IU which we did not fully receive before.")
 				return
-			if type(event.data) is ipaaca_pb2.IUCommission:
+			if type_ is ipaaca_pb2.IUCommission:
 				# IU commit
 				iu = self._iu_store[event.data.uid]
 				iu._apply_commission()
 				iu._revision = event.data.revision
 				self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category)
-			elif type(event.data) is IUPayloadUpdate:
+			elif type_ is IUPayloadUpdate:
 				# IU payload update
 				iu = self._iu_store[event.data.uid]
 				iu._apply_update(event.data)
 				self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category)
+			elif type_ is IULinkUpdate:
+				# IU link update
+				iu = self._iu_store[event.data.uid]
+				iu._apply_link_update(event.data)
+				self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.LINKSUPDATED, category=iu.category)
+			else:
+				logger.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_))
 
 
 class OutputBuffer(Buffer):
@@ -748,6 +881,17 @@ class OutputBuffer(Buffer):
 		self.__iu_id_counter_lock = threading.Lock()
 		self.__iu_id_counter = 0
 	
+	def _create_own_name_listener(self, iu_category):
+		# FIXME replace this
+		'''Create an own name listener.'''
+		#if iu_category in self._listener_store: return self._informer_store[iu_category]
+		#cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
+		#cat_listener.addHandler(self._handle_iu_events)
+		#self._listener_store[iu_category] = cat_listener
+		#self._category_interests.append(iu_category)
+		#logger.info("Added category listener for "+iu_category)
+		#return cat_listener
+	
 	def _generate_iu_uid(self):
 		'''Generate a unique IU id of the form'''
 		with self.__iu_id_counter_lock:
@@ -761,19 +905,21 @@ class OutputBuffer(Buffer):
 			logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
 			return 0
 		iu = self._iu_store[update.uid]
-		if (update.revision != 0) and (update.revision != iu.revision):
-			# (0 means "do not pay attention to the revision number" -> "force update")
-			logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
-			return 0
-		if update.is_delta:
-			for k in update.keys_to_remove:
-				iu.payload.__delitem__(k, writer_name=update.writer_name)
-			for k,v in update.new_items.items():
-				iu.payload.__setitem__(k, v, writer_name=update.writer_name)
-		else:
-			iu._set_payload(update.new_items, writer_name=update.writer_name)
-		self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category)
-		return iu.revision
+		with iu.revision_lock:
+			if (update.revision != 0) and (update.revision != iu.revision):
+				# (0 means "do not pay attention to the revision number" -> "force update")
+				logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
+				return 0
+			if update.is_delta:
+				for k in update.keys_to_remove:
+					iu.payload.__delitem__(k, writer_name=update.writer_name)
+				for k,v in update.new_items.items():
+					iu.payload.__setitem__(k, v, writer_name=update.writer_name)
+			else:
+				iu._set_payload(update.new_items, writer_name=update.writer_name)
+				# _set_payload etc. have also incremented the revision number
+			self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category)
+			return iu.revision
 	
 	def _remote_commit(self, iu_commission):
 		'''Apply a remotely requested commit to one of the stored IUs.'''
@@ -781,16 +927,17 @@ class OutputBuffer(Buffer):
 			logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid))
 			return 0
 		iu = self._iu_store[iu_commission.uid]
-		if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision):
-			# (0 means "do not pay attention to the revision number" -> "force update")
-			logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
-			return 0
-		if iu.committed:
-			return 0
-		else:
-			iu._internal_commit(writer_name=iu_commission.writer_name)
-			self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=IUEventType.COMMITTED, category=iu.category)
-			return iu.revision
+		with iu.revision_lock:
+			if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision):
+				# (0 means "do not pay attention to the revision number" -> "force update")
+				logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
+				return 0
+			if iu.committed:
+				return 0
+			else:
+				iu._internal_commit(writer_name=iu_commission.writer_name)
+				self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=IUEventType.COMMITTED, category=iu.category)
+				return iu.revision
 	
 	def _get_informer(self, iu_category):
 		'''Return (or create, store and return) an informer object for IUs of the specified category.'''
@@ -837,13 +984,39 @@ class OutputBuffer(Buffer):
 		informer = self._get_informer(iu._category)
 		informer.publishData(iu_commission)
 	
+	def _send_iu_link_update(self, iu, is_delta, revision, new_links=None, links_to_remove=None, writer_name="undef"):
+		'''Send an IU link update.
+		
+		Keyword arguments:
+		iu -- the IU being updated
+		is_delta -- whether this is an incremental update or a replacement
+			the whole link dictionary
+		revision -- the new revision number
+		new_links -- a dictionary of new link sets
+		links_to_remove -- a dict of the link sets that shall be removed
+		writer_name -- name of the Buffer that initiated this update, necessary
+			to enable remote components to filter out updates that originate d
+			from their own operations
+		'''
+		if new_links is None:
+			new_links = {}
+		if links_to_remove is None:
+			links_to_remove = {}
+		link_update = IULinkUpdate(iu._uid, is_delta=is_delta, revision=revision)
+		link_update.new_links = new_links
+		if is_delta:
+			link_update.links_to_remove = links_to_remove
+		link_update.writer_name = writer_name
+		informer = self._get_informer(iu._category)
+		informer.publishData(link_update)
+		# FIXME send the notification to the target, if the target is not the writer_name
+	
 	def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"):
 		'''Send an IU payload update.
 		
 		Keyword arguments:
 		iu -- the IU being updated
-		is_delta -- whether the update concerns only a single payload item or
-			the whole payload dictionary
+		is_delta -- whether this is an incremental update or a replacement
 		revision -- the new revision number
 		new_items -- a dictionary of new payload items
 		keys_to_remove -- a list of the keys that shall be removed from the
@@ -869,18 +1042,24 @@ class OutputBuffer(Buffer):
 
 
 def initialize_ipaaca_rsb():#{{{
-	rsb.transport.converter.registerGlobalConverter(
+	rsb.converter.registerGlobalConverter(
 		IntConverter(wireSchema="int32", dataType=int))
-	rsb.transport.converter.registerGlobalConverter(
+	rsb.converter.registerGlobalConverter(
 		IUConverter(wireSchema="ipaaca-iu", dataType=IU))
-	rsb.transport.converter.registerGlobalConverter(
+	rsb.converter.registerGlobalConverter(
+		IULinkUpdateConverter(
+			wireSchema="ipaaca-iu-link-update",
+			dataType=IULinkUpdate))
+	rsb.converter.registerGlobalConverter(
 		IUPayloadUpdateConverter(
 			wireSchema="ipaaca-iu-payload-update",
 			dataType=IUPayloadUpdate))
-	rsb.transport.converter.registerGlobalConverter(
-		rsb.transport.converter.ProtocolBufferConverter(
+	rsb.converter.registerGlobalConverter(
+		rsb.converter.ProtocolBufferConverter(
 			messageClass=ipaaca_pb2.IUCommission))
-	rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()
+	#rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()
+	#t = rsb.ParticipantConfig.Transport('spread', {'enabled':'true'})
+	rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromFile('rsb.cfg')
 #}}}
 
 
diff --git a/python/src/listener.py b/python/src/listener.py
new file mode 100755
index 0000000..dde63b4
--- /dev/null
+++ b/python/src/listener.py
@@ -0,0 +1,39 @@
+#!/usr/bin/env python
+
+import time
+import logging
+import ipaaca
+
+iu_to_write = None
+
+def my_update_handler(iu, event_type, local):
+	global iu_to_write
+	print(event_type+': '+str(iu))
+	iu_to_write = iu
+
+
+
+ib = ipaaca.InputBuffer('CoolReceiver', ['undef'])
+ib.register_handler(my_update_handler)
+
+counter = 0
+#time.sleep(5)
+while True:
+	if iu_to_write is not None:
+		try:
+			counter += 1
+			iu = iu_to_write
+			#if counter == 1:
+			#	iu.payload['a'] = 'remote'
+			if counter % 3 == 1:
+				iu.payload['a'] = 'REMOTELY SET '+str(counter)
+			elif counter % 3 == 2:
+				del iu.payload['a']
+			else:
+				iu.payload = {'a': 'reset'}
+			
+		except ipaaca.IUUpdateFailedError, e:
+			ipaaca.logger.warning("Payload update failed (IU changed in the mean time)")
+	time.sleep(0.1)
+
+exit(0)
diff --git a/python/src/rsb.cfg b/python/src/rsb.cfg
new file mode 100644
index 0000000..c7d8582
--- /dev/null
+++ b/python/src/rsb.cfg
@@ -0,0 +1,5 @@
+[transport.spread]
+host = localhost # default type is string
+port = 4803 # types can be specified in angle brackets
+enabled = true
+
-- 
GitLab