Skip to content
Snippets Groups Projects 41.4 KiB
Newer Older
  • Learn to ignore specific revisions
  • #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    from __future__ import print_function, division
    import logging
    import sys
    import threading
    import uuid
    # IDEAS
    #  We should think about relaying the update event (or at least the
    #  affected keys in the payload / links) to the event handlers!
    #  Output buffers could generate UIDs for IUs on request, without
    #  publishing them at that time. Then UID could then be used
    #  for internal links etc. The IU may be published later through
    #  the same buffer that allocated the UID.
    __all__ = [
    	'InputBuffer', 'OutputBuffer',
    	'IUPublishedError', 'IUUpdateFailedError', 'IUCommittedError', 'IUReadOnlyError',
    ## --- Utilities -------------------------------------------------------------
    def enum(*sequential, **named):
    	"""Create an enum type.
    	Based on suggestion of Alec Thomas on
    	enums = dict(zip(sequential, range(len(sequential))), **named)
    	return type('Enum', (), enums)
    def pack_typed_payload_item(protobuf_object, key, value):
    	protobuf_object.key = str(key)
    	protobuf_object.value = str(value)
    	protobuf_object.type = 'str' # TODO: more types
    def unpack_typed_payload_item(protobuf_object):
    	# TODO: more types
    	return (protobuf_object.key, str(protobuf_object.value))
    class IpaacaLoggingHandler(logging.Handler):
    	def __init__(self, level=logging.DEBUG):
    		logging.Handler.__init__(self, level)
    	def emit(self, record):
    		meta = '[ipaaca] (' + str(record.levelname) + ') ' 
    		msg = str(record.msg.format(record.args))
    		print(meta + msg)
    ## --- Global Definitions ----------------------------------------------------
    IUEventType = enum(
    	ADDED = 'ADDED',
    IUAccessMode = enum(
    ## --- Errors and Exceptions -------------------------------------------------
    class IUPublishedError(Exception):
    	"""Error publishing of an IU failed since it is already in the buffer."""
    	def __init__(self, iu):
    		super(IUPublishedError, self).__init__('IU ' + str(iu.uid) + ' is already present in the output buffer.')
    class IUUpdateFailedError(Exception):
    	"""Error indicating that a remote IU update failed."""
    	def __init__(self, iu):
    		super(IUUpdateFailedError, self).__init__('Remote update failed for IU ' + str(iu.uid) + '.')
    class IUCommittedError(Exception):
    	"""Error indicating that an IU is immutable because it has been committed to."""
    	def __init__(self, iu):
    		super(IUCommittedError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it has been committed to.')
    class IUReadOnlyError(Exception):
    	"""Error indicating that an IU is immutable because it is 'read only'."""
    	def __init__(self, iu):
    		super(IUReadOnlyError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it is read-only.')
    ## --- Generation Architecture -----------------------------------------------
    #class Links(object):
    #	''' This is essentially a dict STR -> set([STR, ...]) '''
    #	def __init__(self, iu, writer_name=None, new_links=None):
    #		nl = {} if new_links is None else new_links
    #		self.iu = iu
    #		self.iu._set_links(links=self, is_delta=False, new_links=pl, links_to_remove=[], writer_name=writer_name)
    #		for k, v in pl.items():
    #			dict.__setitem__(self, k, v)
    #	def add_links(self, type, targets, writer_name=None):
    #		if not hasattr(targets, '__iter__'): targets=[targets]
    #		self.iu._set_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name)
    #	def remove_links(self, type, targets, writer_name=None):
    #		if not hasattr(targets, '__iter__'): targets=[targets]
    #		self.iu._set_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name)
    #	def modify_links(self, add, remove, writer_name=None):
    #		self.iu._set_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name)
    #	def set_links(self, links, writer_name=None):
    #		self.iu._set_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name)
    #	#def get_links(self, type):
    #	#	return set(self.iu._get_links())
    	def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False):
    		pl = {} if new_payload is None else new_payload
    		self.iu = iu
    		# NOTE omit_init_update_message is necessary to prevent checking for
    		#   exceptions and sending updates in the case where we just receive
    		#   a whole new payload from the remote side and overwrite it locally.
    		if (not omit_init_update_message) and (self.iu.buffer is not None):
    			self.iu._modify_payload(payload=self, is_delta=False, new_items=pl, keys_to_remove=[], writer_name=writer_name)
    		for k, v in pl.items():
    			dict.__setitem__(self, k, v)
    	def __setitem__(self, k, v, writer_name=None):
    		self.iu._modify_payload(payload=self, is_delta=True, new_items={k:v}, keys_to_remove=[], writer_name=writer_name)
    		result = dict.__setitem__(self, k, v)
    	def __delitem__(self, k, writer_name=None):
    		self.iu._modify_payload(payload=self, is_delta=True, new_items={}, keys_to_remove=[k], writer_name=writer_name)
    		result = dict.__delitem__(self, k)
    	def _remotely_enforced_setitem(self, k, v):
    		"""Sets an item when requested remotely."""
    		return dict.__setitem__(self, k, v)
    	def _remotely_enforced_delitem(self, k):
    		"""Deletes an item when requested remotely."""
    		return dict.__delitem__(self, k)
    	"""Base class of all specialised IU classes."""
    	def __init__(self, uid, access_mode=IUAccessMode.PUSH, read_only=False):
    		"""Creates an IU.
    		Keyword arguments:
    		uid -- unique ID of this IU
    		access_mode -- access mode of this IU
    		read_only -- flag indicating whether this IU is read_only or not
    		self._uid = uid
    		self._revision = None
    		self._category = None
    		self._owner_name = None
    		self._committed = False
    		self._access_mode = access_mode
    		self._read_only = read_only
    		self._buffer = None
    		# payload is not present here
    	def __str__(self):
    		s = str(self.__class__)+"{ "
    		s += "uid="+self._uid+" "
    		s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") "
    		s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " "
    		s += "payload={ "
    		for k,v in self.payload.items():
    			s += k+":'"+v+"', "
    		s += "} "
    		s += "links={ "
    		for t,ids in self.get_all_links().items():
    			s += t+":'"+str(ids)+"', "
    		s += "} "
    		s += "}"
    		return s
    	def _add_and_remove_links(self, add, remove):
    		'''Just add and remove the new links in our links set, do not send an update here'''
    		'''Note: Also used for remotely enforced links updates.'''
    		for type in remove.keys(): self._links[type] -= set(remove[type])
    		for type in add.keys(): self._links[type] |= set(add[type])
    	def _replace_links(self, links):
    		'''Just wipe and replace our links set, do not send an update here'''
    		'''Note: Also used for remotely enforced links updates.'''
    		self._links = {}
    		for type in links.keys(): self._links[type] |= set(links[type])
    	def add_links(self, type, targets, writer_name=None):
    		'''Attempt to add links if the conditions are met
    		and send an update message. Then call the local setter.'''
    		if not hasattr(targets, '__iter__'): targets=[targets]
    		self._modify_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name)
    		self._add_and_remove_links( add={type:targets}, remove={} )
    	def remove_links(self, type, targets, writer_name=None):
    		'''Attempt to remove links if the conditions are met
    		and send an update message. Then call the local setter.'''
    		if not hasattr(targets, '__iter__'): targets=[targets]
    		self._modify_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name)
    		self._add_and_remove_links( add={}, remove={type:targets} )
    	def modify_links(self, add, remove, writer_name=None):
    		'''Attempt to modify links if the conditions are met
    		and send an update message. Then call the local setter.'''
    		self._modify_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name)
    		self._add_and_remove_links( add=add, remove=remove )
    	def set_links(self, links, writer_name=None):
    		'''Attempt to set (replace) links if the conditions are met
    		and send an update message. Then call the local setter.'''
    		self._modify_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name)
    		self._replace_links( links=new_links )
    	def get_links(self, type):
    		return set(self._links[type])
    	def get_all_links(self):
    		return copy.deepcopy(self._links)
    	def _get_revision(self):
    		return self._revision
    	revision = property(fget=_get_revision, doc='Revision number of the IU.')
    	def _get_category(self):
    		return self._category
    	category = property(fget=_get_category, doc='Category of the IU.')
    	def _get_payload_type(self):
    		return self._payload_type
    	payload_type = property(fget=_get_payload_type, doc='Type of the IU payload')
    	def _get_committed(self):
    		return self._committed
    	committed = property(
    			doc='Flag indicating whether this IU has been committed to.')
    	def _get_uid(self):
    		return self._uid
    	uid = property(fget=_get_uid, doc='Unique ID of the IU.')
    	def _get_access_mode(self):
    		return self._access_mode
    	access_mode = property(fget=_get_access_mode, doc='Access mode of the IU.')
    	def _get_read_only(self):
    		return self._read_only
    	read_only = property(
    			doc='Flag indicating whether this IU is read only.')
    	def _get_buffer(self):
    		return self._buffer
    	def _set_buffer(self, buffer):
    		if self._buffer is not None:
    			raise Exception('The IU is already in a buffer, cannot move it.')
    		self._buffer = buffer
    	buffer = property(
    			doc='Buffer this IU is held in.')
    	def _get_owner_name(self):
    		return self._owner_name
    	def _set_owner_name(self, owner_name):
    		if self._owner_name is not None:
    			raise Exception('The IU already has an owner name, cannot change it.')
    		self._owner_name = owner_name
    	owner_name = property(
    			doc="The IU's owner's name.")
    class IU(IUInterface):#{{{
    	"""A local IU."""
    	def __init__(self, access_mode=IUAccessMode.PUSH, read_only=False, category='undef', _payload_type='MAP'):
    		super(IU, self).__init__(uid=None, access_mode=access_mode, read_only=read_only)
    		self._revision = 1
    		self._category = category
    		self._payload_type = _payload_type
    	def _modify_links(self, links, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
    		if self.committed:
    			raise IUCommittedError(self)
    		with self.revision_lock:
    			# modify links locally
    			if self.is_published:
    				# send update to remote holders
    						writer_name=self.owner_name if writer_name is None else writer_name)
    	def _modify_payload(self, payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
    		"""Modify the payload: add or remove items from this payload locally and send update."""
    		if self.committed:
    			raise IUCommittedError(self)
    		with self.revision_lock:
    			# set item locally
    			if self.is_published:
    				# send update to remote holders
    						writer_name=self.owner_name if writer_name is None else writer_name)
    	def _increase_revision_number(self):
    		self._revision += 1
    	def _internal_commit(self, writer_name=None):
    		if self.committed:
    			raise IUCommittedError(self)
    		with self.revision_lock:
    			if not self._committed:
    				self._committed = True
    				self.buffer._send_iu_commission(self, writer_name=writer_name)
    	def commit(self):
    		"""Commit to this IU."""
    		return self._internal_commit()
    	def _get_payload(self):
    		return self._payload
    	def _set_payload(self, new_pl, writer_name=None):
    		if self.committed:
    			raise IUCommittedError(self)
    		with self.revision_lock:
    					writer_name=None if self.buffer is None else (self.buffer.unique_name if writer_name is None else writer_name),
    	payload = property(
    			doc='Payload dictionary of this IU.')
    	def _get_is_published(self):
    		return self.buffer is not None
    	is_published = property(
    			doc='Flag indicating whether this IU has been published or not.')
    	def _set_buffer(self, buffer):
    		if self._buffer is not None:
    			raise Exception('The IU is already in a buffer, cannot move it.')
    		self._buffer = buffer
    		self.owner_name = buffer.unique_name
    		self._payload.owner_name = buffer.unique_name
    	buffer = property(
    			doc='Buffer this IU is held in.')
    	def _set_uid(self, uid):
    		if self._uid is not None:
    			raise AttributeError('The uid of IU ' + self.uid + ' has already been set, cannot change it.')
    		self._uid = uid
    	uid = property(
    			doc='Unique ID of theIU.')
    class RemotePushIU(IUInterface):#{{{
    	"""A remote IU with access mode 'PUSH'."""
    	def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload):
    		super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only)
    		self._revision = revision
    		self._category = category
    		self.owner_name = owner_name
    		self._payload_type = payload_type
    		# NOTE Since the payload is an already-existant Payload which we didn't modify ourselves,
    		#  don't try to invoke any modification checks or network updates ourselves either.
    		#  We are just receiving it here and applying the new data.
    		self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True)
    	def _modify_links(self, links, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
    		"""Modify the links: add or remove item from this payload remotely and send update."""
    		if self.committed:
    			raise IUCommittedError(self)
    		if self.read_only:
    			raise IUReadOnlyError(self)
    		requested_update = IULinkUpdate(
    		remote_server = self.buffer._get_remote_server(self)
    		new_revision = remote_server.updateLinks(requested_update)
    		if new_revision == 0:
    			raise IUUpdateFailedError(self)
    			self._revision = new_revision
    	def _modify_payload(self, payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
    		"""Modify the payload: add or remove item from this payload remotely and send update."""
    		if self.committed:
    			raise IUCommittedError(self)
    		if self.read_only:
    			raise IUReadOnlyError(self)
    		requested_update = IUPayloadUpdate(
    		remote_server = self.buffer._get_remote_server(self)
    		new_revision = remote_server.updatePayload(requested_update)
    		if new_revision == 0:
    			raise IUUpdateFailedError(self)
    			self._revision = new_revision
    	def commit(self):
    		"""Commit to this IU."""
    		if self.read_only:
    			raise IUReadOnlyError(self)
    		if self._committed:
    			# ignore commit requests when already committed
    			commission_request = ipaaca_pb2.IUCommission()
    			commission_request.uid = self.uid
    			commission_request.revision = self.revision
    			commission_request.writer_name = self.buffer.unique_name
    			remote_server = self.buffer._get_remote_server(self)
    			new_revision = remote_server.commit(commission_request)
    			if new_revision == 0:
    				raise IUUpdateFailedError(self)
    				self._revision = new_revision
    				self._committed = True
    	def _get_payload(self):
    		return self._payload
    	def _set_payload(self, new_pl):
    		if self.committed:
    			raise IUCommittedError(self)
    		if self.read_only:
    			raise IUReadOnlyError(self)
    		requested_update = IUPayloadUpdate(
    		remote_server = self.buffer._get_remote_server(self)
    		new_revision = remote_server.updatePayload(requested_update)
    		if new_revision == 0:
    			raise IUUpdateFailedError(self)
    			self._revision = new_revision
    			# NOTE Please read the comment in the constructor
    			self._payload = Payload(iu=self, new_payload=new_pl, omit_init_update_message=True)
    	payload = property(
    			doc='Payload dictionary of the IU.')
    	def _apply_link_update(self, update):
    		"""Apply a IULinkUpdate to the IU."""
    		self._revision = update.revision
    		if update.is_delta:
    			self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove)
    	def _apply_update(self, update):
    		"""Apply a IUPayloadUpdate to the IU."""
    		self._revision = update.revision
    		if update.is_delta:
    			for k in update.keys_to_remove: self.payload._remotely_enforced_delitem(k)
    			for k, v in update.new_items.items(): self.payload._remotely_enforced_setitem(k, v)
    			# NOTE Please read the comment in the constructor
    			self._payload = Payload(iu=self, new_payload=update.new_items, omit_init_update_message=True)
    	def _apply_commission(self):
    		"""Apply commission to the IU"""
    		self._committed = True
    class IntConverter(rsb.converter.Converter):#{{{
    	"""Convert Python int objects to Protobuf ints and vice versa."""
    	def __init__(self, wireSchema="int", dataType=int):
    		super(IntConverter, self).__init__(bytearray, dataType, wireSchema)
    	def serialize(self, value):
    		pbo = ipaaca_pb2.IntMessage()
    		pbo.value = value
    		return bytearray(pbo.SerializeToString()), self.wireSchema
    	def deserialize(self, byte_stream, ws):
    		pbo = ipaaca_pb2.IntMessage()
    class IUConverter(rsb.converter.Converter):#{{{
    	Converter class for Full IU representations
    	wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
    	def __init__(self, wireSchema="ipaaca-iu", dataType=IU):
    		super(IUConverter, self).__init__(bytearray, dataType, wireSchema)
    	def serialize(self, iu):
    		pbo = ipaaca_pb2.IU()
    		pbo.uid = iu._uid
    		pbo.revision = iu._revision
    		pbo.category = iu._category
    		pbo.payload_type = iu._payload_type
    		pbo.owner_name = iu._owner_name
    		pbo.committed = iu._committed
    		pbo.access_mode = ipaaca_pb2.IU.PUSH # TODO
    		pbo.read_only = iu._read_only
    		for k,v in iu._payload.items():
    			entry = pbo.payload.add()
    			pack_typed_payload_item(entry, k, v)
    		return bytearray(pbo.SerializeToString()), self.wireSchema
    	def deserialize(self, byte_stream, ws):
    		type = self.getDataType()
    		if type == IU:
    			pbo = ipaaca_pb2.IU()
    			if pbo.access_mode ==  ipaaca_pb2.IU.PUSH:
    				_payload = {}
    				for entry in pbo.payload:
    					k, v = unpack_typed_payload_item(entry)
    					_payload[k] = v
    				remote_push_iu = RemotePushIU(
    						read_only = pbo.read_only,
    						owner_name = pbo.owner_name,
    						category = pbo.category,
    						committed = pbo.committed,
    				return remote_push_iu
    				raise Exception("We can only handle IUs with access mode 'PUSH' for now!")
    			raise ValueError("Inacceptable dataType %s" % type)
    class IULinkUpdate(object):#{{{
    	def __init__(self, uid, revision, is_delta, writer_name="undef", new_links=None, links_to_remove=None):
    		super(IULinkUpdate, self).__init__()
    		self.uid = uid
    		self.revision = revision
    		self.writer_name = writer_name
    		self.is_delta = is_delta
    		self.new_links = collections.defaultdict(set) if new_links is None else collections.defaultdict(set, new_links)
    		self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove)
    	def __str__(self):
    		s =  'LinkUpdate(' + 'uid=' + self.uid + ', '
    		s += 'revision='+str(self.revision)+', '
    		s += 'writer_name='+str(self.writer_name)+', '
    		s += 'is_delta='+str(self.is_delta)+', '
    		s += 'new_links = '+str(self.new_links)+', '
    		s += 'links_to_remove = '+str(self.links_to_remove)+')'
    		return s
    class IUPayloadUpdate(object):#{{{
    	def __init__(self, uid, revision, is_delta, writer_name="undef", new_items=None, keys_to_remove=None):
    		super(IUPayloadUpdate, self).__init__()
    		self.uid = uid
    		self.revision = revision
    		self.writer_name = writer_name
    		self.is_delta = is_delta
    		self.new_items = {} if new_items is None else new_items
    		self.keys_to_remove = [] if keys_to_remove is None else keys_to_remove
    	def __str__(self):
    		s =  'PayloadUpdate(' + 'uid=' + self.uid + ', '
    		s += 'revision='+str(self.revision)+', '
    		s += 'writer_name='+str(self.writer_name)+', '
    		s += 'is_delta='+str(self.is_delta)+', '
    		s += 'new_items = '+str(self.new_items)+', '
    		s += 'keys_to_remove = '+str(self.keys_to_remove)+')'
    		return s
    class IULinkUpdateConverter(rsb.converter.Converter):#{{{
    	def __init__(self, wireSchema="ipaaca-iu-link-update", dataType=IULinkUpdate):
    		super(IULinkUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
    	def serialize(self, iu_link_update):
    		pbo = ipaaca_pb2.IULinkUpdate()
    		pbo.uid = iu_link_update.uid
    		pbo.writer_name = iu_link_update.writer_name
    		pbo.revision = iu_link_update.revision
    		for type_ in iu_link_update.new_links.keys():
    			linkset = pbo.new_links.add()
    			linkset.type = type_
    		for type_ in iu_link_update.links_to_remove.keys():
    			linkset = pbo.links_to_remove.add()
    			linkset.type = type_
    		pbo.is_delta = iu_link_update.is_delta
    		return bytearray(pbo.SerializeToString()), self.wireSchema
    	def deserialize(self, byte_stream, ws):
    		type = self.getDataType()
    		if type == IULinkUpdate:
    			pbo = ipaaca_pb2.IULinkUpdate()
    			pbo.ParseFromString( str(byte_stream) )
    			logger.debug('received an IULinkUpdate for revision '+str(pbo.revision))
    			iu_link_up = IULinkUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
    			for entry in pbo.new_links:
    				iu_link_up.new_links[str(entry.type)] = set(entry.targets)
    			for entry in pbo.links_to_remove:
    				iu_link_up.links_to_remove[str(entry.type)] = set(entry.targets)
    			return iu_link_up
    			raise ValueError("Inacceptable dataType %s" % type)
    class IUPayloadUpdateConverter(rsb.converter.Converter):#{{{
    	def __init__(self, wireSchema="ipaaca-iu-payload-update", dataType=IUPayloadUpdate):
    		super(IUPayloadUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
    	def serialize(self, iu_payload_update):
    		pbo = ipaaca_pb2.IUPayloadUpdate()
    		pbo.uid = iu_payload_update.uid
    		pbo.writer_name = iu_payload_update.writer_name
    		pbo.revision = iu_payload_update.revision
    		for k,v in iu_payload_update.new_items.items():
    			entry = pbo.new_items.add()
    			pack_typed_payload_item(entry, k, v)
    		pbo.is_delta = iu_payload_update.is_delta
    		return bytearray(pbo.SerializeToString()), self.wireSchema
    	def deserialize(self, byte_stream, ws):
    		type = self.getDataType()
    		if type == IUPayloadUpdate:
    			pbo = ipaaca_pb2.IUPayloadUpdate()
    			logger.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
    			iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
    			for entry in pbo.new_items:
    				k, v = unpack_typed_payload_item(entry)
    				iu_up.new_items[k] = v
    			iu_up.keys_to_remove = pbo.keys_to_remove[:]
    			return iu_up
    			raise ValueError("Inacceptable dataType %s" % type)
    class IUStore(dict):
    	"""A dictionary storing IUs."""
    	def __init__(self):
    		super(IUStore, self).__init__()
    class IUEventHandler(object):
    	"""Wrapper for IU event handling functions."""
    	def __init__(self, handler_function, for_event_types=None, for_categories=None):
    		"""Create an IUEventHandler.
    		Keyword arguments:
    		handler_function -- the handler function with the signature
    			(IU, event_type, local)
    		for_event_types -- a list of event types or None if handler should
    			be called for all event types
    		for_categories -- a list of category names or None if handler should
    			be called for all categoires
    		super(IUEventHandler, self).__init__()
    		self._handler_function = handler_function
    		self._for_event_types = (
    			None if for_event_types is None else
    			(for_event_types[:] if hasattr(for_event_types, '__iter__') else [for_event_types]))
    		self._for_categories = (
    			None if for_categories is None else
    			(for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories]))
    	def condition_met(self, event_type, category):
    		"""Check whether this IUEventHandler should be called.
    		Keyword arguments:
    		event_type -- type of the IU event
    		category -- category of the IU which triggered the event
    		type_condition_met = (self._for_event_types is None or event_type in self._for_event_types)
    		cat_condition_met = (self._for_categories is None or category in self._for_categories)
    		return type_condition_met and cat_condition_met
    	def call(self, buffer, iu_uid, local, event_type, category):
    		"""Call this IUEventHandler's function, if it applies.
    		Keyword arguments:
    		buffer -- the buffer in which the IU is stored
    		iu_uid -- the uid of the IU
    		local -- is the IU local or remote to this component? @RAMIN: Is this correct?
    		event_type -- IU event type
    		category -- category of the IU
    		if self.condition_met(event_type, category):
    			iu = buffer._iu_store[iu_uid]
    			self._handler_function(iu, event_type, local)
    class Buffer(object):
    	"""Base class for InputBuffer and OutputBuffer."""
    	def __init__(self, owning_component_name, participant_config=None):
    		'''Create a Buffer.
    		Keyword arguments:
    		owning_compontent_name -- name of the entity that owns this Buffer
    		participant_config -- RSB configuration
    		super(Buffer, self).__init__()
    		self._owning_component_name = owning_component_name
    		self._participant_config = participant_config #rsb.ParticipantConfig.fromDefaultSources() if participant_config is None else participant_config
    		self._uuid = str(uuid.uuid4())[0:8]
    		# Initialise with a temporary, but already unique, name
    		self._unique_name = "undef-"+self._uuid
    		self._iu_store = IUStore()
    		self._iu_event_handlers = []
    	def register_handler(self, handler_function, for_event_types=None, for_categories=None):
    		"""Register a new IU event handler function.
    		Keyword arguments:
    		handler_function -- a function with the signature (IU, event_type, local)
    		for_event_types -- a list of event types or None if handler should
    			be called for all event types
    		for_categories -- a list of category names or None if handler should
    			be called for all categoires
    		handler = IUEventHandler(handler_function=handler_function, for_event_types=for_event_types, for_categories=for_categories)
    	def call_iu_event_handlers(self, uid, local, event_type, category):
    		"""Call registered IU event handler functions registered for this event_type and category."""
    		for h in self._iu_event_handlers:
    			# print('calling an update handler for '+event_type+' -> '+str(h)), uid, local=local, event_type=event_type, category=category)
    	def _get_owning_component_name(self):
    		"""Return the name of this Buffer's owning component"""
    		return self._owning_component_name
    	owning_component_name = property(_get_owning_component_name)
    	def _get_unique_name(self):
    		"""Return the Buffer's unique name."""
    		return self._unique_name
    	unique_name = property(_get_unique_name)
    class InputBuffer(Buffer):
    	"""An InputBuffer that holds remote IUs."""
    	def __init__(self, owning_component_name, category_interests=None, participant_config=None):
    		'''Create an InputBuffer.
    		Keyword arguments:
    		owning_compontent_name -- name of the entity that owns this InputBuffer
    		category_interests -- list of IU categories this Buffer is interested in
    		participant_config = RSB configuration
    		super(InputBuffer, self).__init__(owning_component_name, participant_config)
    		self._unique_name = '/ipaaca/component/'+str(owning_component_name)+'ID'+self._uuid+'/IB'
    		self._listener_store = {} # one per IU category
    		self._remote_server_store = {} # one per remote-IU-owning Component
    		self._category_interests = []
    		if category_interests is not None:
    			for cat in category_interests:
    	def _get_remote_server(self, iu):
    		'''Return (or create, store and return) a remote server.'''
    		if iu.owner_name in self._remote_server_store:
    			return self._remote_server_store[iu.owner_name]
    		#  TODO remove the str() when unicode is supported (issue #490)
    		remote_server = rsb.createRemoteServer(rsb.Scope(str(iu.owner_name)))
    		self._remote_server_store[iu.owner_name] = remote_server
    		return remote_server
    	def _create_category_listener_if_needed(self, iu_category):
    		'''Return (or create, store and return) a category listener.'''
    		if iu_category in self._listener_store: return self._informer_store[iu_category]
    		cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
    		self._listener_store[iu_category] = cat_listener
    		self._category_interests.append(iu_category)"Added category listener for "+iu_category)
    		return cat_listener
    	def _handle_iu_events(self, event):
    		'''Dispatch incoming IU events.
    		Adds incoming IU's to the store, applies payload and commit updates to
    		IU, calls IU event handlers.'
    		Keyword arguments:
    		event -- a converted RSB event
    		type_ = type(
    		if type_ is RemotePushIU:
    			# a new IU
    			if in self._iu_store:
    				# already in our store
    				self._iu_store[ ] = = self
    				self.call_iu_event_handlers(, local=False, event_type=IUEventType.ADDED,
    			# an update to an existing IU
    			if == self.unique_name:
    				# Discard updates that originate from this buffer
    			if not in self._iu_store:
    				# TODO: we should request the IU's owner to send us the IU
    				logger.warning("Update message for IU which we did not fully receive before.")
    				# IU commit
    				iu = self._iu_store[]
    				iu._revision =
    				self.call_iu_event_handlers(, local=False, event_type=IUEventType.COMMITTED, category=iu.category)
    				# IU payload update
    				iu = self._iu_store[]
    				self.call_iu_event_handlers(, local=False, event_type=IUEventType.UPDATED, category=iu.category)
    			elif type_ is IULinkUpdate:
    				# IU link update
    				iu = self._iu_store[]
    				self.call_iu_event_handlers(, local=False, event_type=IUEventType.LINKSUPDATED, category=iu.category)
    				logger.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_))
    class OutputBuffer(Buffer):
    	"""An OutputBuffer that holds local IUs."""
    	def __init__(self, owning_component_name, participant_config=None):
    		'''Create an Output Buffer.
    		Keyword arguments:
    		owning_component_name -- name of the entity that own this buffer
    		participant_config -- RSB configuration
    		super(OutputBuffer, self).__init__(owning_component_name, participant_config)
    		self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB'
    		self._server = rsb.createServer(rsb.Scope(self._unique_name))
    		self._server.addMethod('updateLinks', self._remote_update_links, IULinkUpdate, int)
    		self._server.addMethod('updatePayload', self._remote_update_payload, IUPayloadUpdate, int)
    		self._server.addMethod('commit', self._remote_commit, ipaaca_pb2.IUCommission, int)
    		self._informer_store = {}
    		self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-'
    		self.__iu_id_counter_lock = threading.Lock()
    		self.__iu_id_counter = 0
    	def _create_own_name_listener(self, iu_category):
    		# FIXME replace this
    		'''Create an own name listener.'''
    		#if iu_category in self._listener_store: return self._informer_store[iu_category]
    		#cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
    		#self._listener_store[iu_category] = cat_listener
    		#self._category_interests.append(iu_category)"Added category listener for "+iu_category)
    		#return cat_listener
    	def _generate_iu_uid(self):
    		'''Generate a unique IU id of the form'''
    		with self.__iu_id_counter_lock:
    			self.__iu_id_counter += 1
    			number = self.__iu_id_counter
    		return self._id_prefix + str(number)
    	def _remote_update_links(self, update):
    		'''Apply a remotely requested update to one of the stored IU's links.'''
    		if update.uid not in self._iu_store:
    			logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
    			return 0
    		iu = self._iu_store[update.uid]
    		with iu.revision_lock:
    			if (update.revision != 0) and (update.revision != iu.revision):
    				# (0 means "do not pay attention to the revision number" -> "force update")
    				logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
    				return 0
    			if update.is_delta:
    				iu.modify_links(add=update.new_links, remove=update.links_to_remove, writer_name=update.writer_name)
    				iu.set_links(links=update.new_links, writer_name=update.writer_name)
    			self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.LINKSUPDATED, category=iu.category)
    			return iu.revision
    	def _remote_update_payload(self, update):
    		'''Apply a remotely requested update to one of the stored IU's payload.'''
    		if update.uid not in self._iu_store:
    			logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
    			return 0
    		iu = self._iu_store[update.uid]
    		with iu.revision_lock:
    			if (update.revision != 0) and (update.revision != iu.revision):
    				# (0 means "do not pay attention to the revision number" -> "force update")
    				logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
    				return 0
    			if update.is_delta:
    				for k in update.keys_to_remove:
    					iu.payload.__delitem__(k, writer_name=update.writer_name)
    				for k,v in update.new_items.items():
    					iu.payload.__setitem__(k, v, writer_name=update.writer_name)
    				iu._set_payload(update.new_items, writer_name=update.writer_name)
    				# _set_payload etc. have also incremented the revision number
    			self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category)
    			return iu.revision
    	def _remote_commit(self, iu_commission):
    		'''Apply a remotely requested commit to one of the stored IUs.'''
    		if iu_commission.uid not in self._iu_store:
    			logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid))
    			return 0
    		iu = self._iu_store[iu_commission.uid]
    		with iu.revision_lock:
    			if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision):
    				# (0 means "do not pay attention to the revision number" -> "force update")
    				logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
    				return 0
    			if iu.committed: