#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import print_function, division import logging import sys import threading import uuid import collections import rsb import rsb.converter import ipaaca_pb2 __all__ = [ 'IUEventType', 'IUAccessMode', 'InputBuffer', 'OutputBuffer', 'IU', 'IUPublishedError', 'IUUpdateFailedError', 'IUCommittedError', 'IUReadOnlyError', 'logger' ] ## --- Utilities ------------------------------------------------------------- def enum(*sequential, **named): """Create an enum type. Based on suggestion of Alec Thomas on stackoverflow.com: http://stackoverflow.com/questions/36932/ whats-the-best-way-to-implement-an-enum-in-python/1695250#1695250 """ enums = dict(zip(sequential, range(len(sequential))), **named) return type('Enum', (), enums) def pack_typed_payload_item(protobuf_object, key, value): protobuf_object.key = str(key) protobuf_object.value = str(value) protobuf_object.type = 'str' # TODO: more types def unpack_typed_payload_item(protobuf_object): # TODO: more types return (protobuf_object.key, str(protobuf_object.value)) class IpaacaLoggingHandler(logging.Handler): def __init__(self, level=logging.DEBUG): logging.Handler.__init__(self, level) def emit(self, record): meta = '[ipaaca] (' + str(record.levelname) + ') ' msg = str(record.msg.format(record.args)) print(meta + msg) ## --- Global Definitions ---------------------------------------------------- IUEventType = enum( ADDED = 'ADDED', COMMITTED = 'COMMITTED', DELETED = 'DELETED', RETRACTED = 'RETRACTED', UPDATED = 'UPDATED', LINKSUPDATED = 'LINKSUPDATED' ) IUAccessMode = enum( "PUSH", "REMOTE", "MESSAGE" ) ## --- Errors and Exceptions ------------------------------------------------- class IUPublishedError(Exception): """Error publishing of an IU failed since it is already in the buffer.""" def __init__(self, iu): super(IUPublishedError, self).__init__('IU ' + str(iu.uid) + ' is already present in the output buffer.') class IUUpdateFailedError(Exception): """Error indicating that a remote IU update failed.""" def __init__(self, iu): super(IUUpdateFailedError, self).__init__('Remote update failed for IU ' + str(iu.uid) + '.') class IUCommittedError(Exception): """Error indicating that an IU is immutable because it has been committed to.""" def __init__(self, iu): super(IUCommittedError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it has been committed to.') class IUReadOnlyError(Exception): """Error indicating that an IU is immutable because it is 'read only'.""" def __init__(self, iu): super(IUReadOnlyError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it is read-only.') ## --- Generation Architecture ----------------------------------------------- class Links(object): ''' This is essentially a dict STR -> set([STR, ...]) ''' def __init__(self, iu, writer_name=None, new_links=None): nl = {} if new_links is None else new_links self.iu = iu self.iu._set_links(links=self, is_delta=False, new_links=pl, links_to_remove=[], writer_name=writer_name) for k, v in pl.items(): dict.__setitem__(self, k, v) def add_links(self, type, targets, writer_name=None): if not hasattr(targets, '__iter__'): targets=[targets] self.iu._set_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name) def remove_links(self, type, targets, writer_name=None): if not hasattr(targets, '__iter__'): targets=[targets] self.iu._set_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name) def modify_links(self, add, remove, writer_name=None): self.iu._set_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name) def set_links(self, links, writer_name=None): self.iu._set_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name) def get_links(self, type): return set(self.iu._get_links()) class Payload(dict): def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False): pl = {} if new_payload is None else new_payload self.iu = iu # NOTE omit_init_update_message is necessary to prevent checking for # exceptions and sending updates in the case where we just receive # a whole new payload from the remote side and overwrite it locally. if (not omit_init_update_message) and (self.iu.buffer is not None): self.iu._modify_payload(payload=self, is_delta=False, new_items=pl, keys_to_remove=[], writer_name=writer_name) for k, v in pl.items(): dict.__setitem__(self, k, v) def __setitem__(self, k, v, writer_name=None): self.iu._modify_payload(payload=self, is_delta=True, new_items={k:v}, keys_to_remove=[], writer_name=writer_name) result = dict.__setitem__(self, k, v) def __delitem__(self, k, writer_name=None): self.iu._modify_payload(payload=self, is_delta=True, new_items={}, keys_to_remove=[k], writer_name=writer_name) result = dict.__delitem__(self, k) def _remotely_enforced_setitem(self, k, v): """Sets an item when requested remotely.""" return dict.__setitem__(self, k, v) def _remotely_enforced_delitem(self, k): """Deletes an item when requested remotely.""" return dict.__delitem__(self, k) class IUInterface(object): #{{{ """Base class of all specialised IU classes.""" def __init__(self, uid, access_mode=IUAccessMode.PUSH, read_only=False): """Creates an IU. Keyword arguments: uid -- unique ID of this IU access_mode -- access mode of this IU read_only -- flag indicating whether this IU is read_only or not """ self._uid = uid self._revision = None self._category = None self._payload_type = None self._owner_name = None self._committed = False self._access_mode = access_mode self._read_only = read_only self._buffer = None # payload is not present here self._links = collections.defaultdict(set) def _add_and_remove_links(self, add, remove): for type in remove.keys(): self._links[type] -= remove[type] for type in add.keys(): self._links[type] |= add[type] def add_links(self, type, targets, writer_name=None): if not hasattr(targets, '__iter__'): targets=[targets] self._modify_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name) self._add_and_remove_links( add={type:targets}, remove={} ) def remove_links(self, type, targets, writer_name=None): if not hasattr(targets, '__iter__'): targets=[targets] self._modify_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name) self._add_and_remove_links( add={}, remove={type:targets} ) def modify_links(self, add, remove, writer_name=None): self._modify_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name) self._add_and_remove_links( add=add, remove=remove ) def set_links(self, links, writer_name=None): self._modify_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name) self._links = {} self._add_and_remove_links( add=new_links, remove={} ) def get_links(self, type): return set(self._links[type]) def _get_revision(self): return self._revision revision = property(fget=_get_revision, doc='Revision number of the IU.') def _get_category(self): return self._category category = property(fget=_get_category, doc='Category of the IU.') def _get_payload_type(self): return self._payload_type payload_type = property(fget=_get_payload_type, doc='Type of the IU payload') def _get_committed(self): return self._committed committed = property( fget=_get_committed, doc='Flag indicating whether this IU has been committed to.') def _get_uid(self): return self._uid uid = property(fget=_get_uid, doc='Unique ID of the IU.') def _get_access_mode(self): return self._access_mode access_mode = property(fget=_get_access_mode, doc='Access mode of the IU.') def _get_read_only(self): return self._read_only read_only = property( fget=_get_read_only, doc='Flag indicating whether this IU is read only.') def _get_buffer(self): return self._buffer def _set_buffer(self, buffer): if self._buffer is not None: raise Exception('The IU is already in a buffer, cannot move it.') self._buffer = buffer buffer = property( fget=_get_buffer, fset=_set_buffer, doc='Buffer this IU is held in.') def _get_owner_name(self): return self._owner_name def _set_owner_name(self, owner_name): if self._owner_name is not None: raise Exception('The IU already has an owner name, cannot change it.') self._owner_name = owner_name owner_name = property( fget=_get_owner_name, fset=_set_owner_name, doc="The IU's owner's name.") #}}} class IU(IUInterface):#{{{ """A local IU.""" def __init__(self, access_mode=IUAccessMode.PUSH, read_only=False, category='undef', _payload_type='MAP'): super(IU, self).__init__(uid=None, access_mode=access_mode, read_only=read_only) self._revision = 1 self._category = category self._payload_type = _payload_type self.revision_lock = threading.RLock() self._payload = Payload(iu=self) def _modify_links(self, links, is_delta=False, new_links={}, links_to_remove={}, writer_name=None): if self.committed: raise IUCommittedError(self) with self.revision_lock: # modify links locally self._increase_revision_number() if self.is_published: # send update to remote holders self.buffer._send_iu_link_update( self, revision=self.revision, is_delta=is_delta, new_links=new_links, links_to_remove=links_to_remove, writer_name=self.owner_name if writer_name is None else writer_name) def _modify_payload(self, payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): """Modify the payload: add or remove items from this payload locally and send update.""" if self.committed: raise IUCommittedError(self) with self.revision_lock: # set item locally self._increase_revision_number() if self.is_published: # send update to remote holders self.buffer._send_iu_payload_update( self, revision=self.revision, is_delta=is_delta, new_items=new_items, keys_to_remove=keys_to_remove, writer_name=self.owner_name if writer_name is None else writer_name) def _increase_revision_number(self): self._revision += 1 def _internal_commit(self, writer_name=None): if self.committed: raise IUCommittedError(self) with self.revision_lock: if not self._committed: self._increase_revision_number() self._committed = True self.buffer._send_iu_commission(self, writer_name=writer_name) def commit(self): """Commit to this IU.""" return self._internal_commit() def __str__(self): s = "IU{ " s += "uid="+self._uid+" " s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") " s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " " s += "payload={ " for k,v in self.payload.items(): s += k+":'"+v+"', " s += "} " s += "}" return s def _get_payload(self): return self._payload def _set_payload(self, new_pl, writer_name=None): if self.committed: raise IUCommittedError(self) with self.revision_lock: self._increase_revision_number() self._payload = Payload( iu=self, writer_name=None if self.buffer is None else (self.buffer.unique_name if writer_name is None else writer_name), new_payload=new_pl) payload = property( fget=_get_payload, fset=_set_payload, doc='Payload dictionary of this IU.') def _get_is_published(self): return self.buffer is not None is_published = property( fget=_get_is_published, doc='Flag indicating whether this IU has been published or not.') def _set_buffer(self, buffer): if self._buffer is not None: raise Exception('The IU is already in a buffer, cannot move it.') self._buffer = buffer self.owner_name = buffer.unique_name self._payload.owner_name = buffer.unique_name buffer = property( fget=IUInterface._get_buffer, fset=_set_buffer, doc='Buffer this IU is held in.') def _set_uid(self, uid): if self._uid is not None: raise AttributeError('The uid of IU ' + self.uid + ' has already been set, cannot change it.') self._uid = uid uid = property( fget=IUInterface._get_uid, fset=_set_uid, doc='Unique ID of theIU.') #}}} class RemotePushIU(IUInterface):#{{{ """A remote IU with access mode 'PUSH'.""" def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload): super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only) self._revision = revision self._category = category self.owner_name = owner_name self._payload_type = payload_type self._committed = committed # NOTE Since the payload is an already-existant Payload which we didn't modify ourselves, # don't try to invoke any modification checks or network updates ourselves either. # We are just receiving it here and applying the new data. self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True) def _modify_payload(self, payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): """Modify the payload: add or remove item from this payload remotely and send update.""" if self.committed: raise IUCommittedError(self) if self.read_only: raise IUReadOnlyError(self) requested_update = IUPayloadUpdate( uid=self.uid, revision=self.revision, is_delta=is_delta, writer_name=self.buffer.unique_name, new_items=new_items, keys_to_remove=keys_to_remove) remote_server = self.buffer._get_remote_server(self) new_revision = remote_server.updatePayload(requested_update) if new_revision == 0: raise IUUpdateFailedError(self) else: self._revision = new_revision def __str__(self): s = "RemotePushIU{ " s += "uid="+self._uid+" " s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") " s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " " s += "payload={ " for k,v in self.payload.items(): s += k+":'"+v+"', " s += "} " s += "}" return s def commit(self): """Commit to this IU.""" if self.read_only: raise IUReadOnlyError(self) if self._committed: # ignore commit requests when already committed return else: commission_request = ipaaca_pb2.IUCommission() commission_request.uid = self.uid commission_request.revision = self.revision commission_request.writer_name = self.buffer.unique_name remote_server = self.buffer._get_remote_server(self) new_revision = remote_server.commit(commission_request) if new_revision == 0: raise IUUpdateFailedError(self) else: self._revision = new_revision self._committed = True def _get_payload(self): return self._payload def _set_payload(self, new_pl): if self.committed: raise IUCommittedError(self) if self.read_only: raise IUReadOnlyError(self) requested_update = IUPayloadUpdate( uid=self.uid, revision=self.revision, is_delta=False, writer_name=self.buffer.unique_name, new_items=new_pl, keys_to_remove=[]) remote_server = self.buffer._get_remote_server(self) new_revision = remote_server.updatePayload(requested_update) if new_revision == 0: raise IUUpdateFailedError(self) else: self._revision = new_revision # NOTE Please read the comment in the constructor self._payload = Payload(iu=self, new_payload=new_pl, omit_init_update_message=True) payload = property( fget=_get_payload, fset=_set_payload, doc='Payload dictionary of the IU.') def _apply_update(self, update): """Apply a IUPayloadUpdate to the IU.""" self._revision = update.revision if update.is_delta: for k in update.keys_to_remove: self.payload._remotely_enforced_delitem(k) for k, v in update.new_items.items(): self.payload._remotely_enforced_setitem(k, v) else: # NOTE Please read the comment in the constructor self._payload = Payload(iu=self, new_payload=update.new_items, omit_init_update_message=True) def _apply_commission(self): """Apply commission to the IU""" self._committed = True #}}} class IntConverter(rsb.converter.Converter):#{{{ """Convert Python int objects to Protobuf ints and vice versa.""" def __init__(self, wireSchema="int", dataType=int): super(IntConverter, self).__init__(bytearray, dataType, wireSchema) def serialize(self, value): pbo = ipaaca_pb2.IntMessage() pbo.value = value return bytearray(pbo.SerializeToString()), self.wireSchema def deserialize(self, byte_stream, ws): pbo = ipaaca_pb2.IntMessage() pbo.ParseFromString( str(byte_stream) ) return pbo.value #}}} class IUConverter(rsb.converter.Converter):#{{{ ''' Converter class for Full IU representations wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU ''' def __init__(self, wireSchema="ipaaca-iu", dataType=IU): super(IUConverter, self).__init__(bytearray, dataType, wireSchema) def serialize(self, iu): pbo = ipaaca_pb2.IU() pbo.uid = iu._uid pbo.revision = iu._revision pbo.category = iu._category pbo.payload_type = iu._payload_type pbo.owner_name = iu._owner_name pbo.committed = iu._committed pbo.access_mode = ipaaca_pb2.IU.PUSH # TODO pbo.read_only = iu._read_only for k,v in iu._payload.items(): entry = pbo.payload.add() pack_typed_payload_item(entry, k, v) return bytearray(pbo.SerializeToString()), self.wireSchema def deserialize(self, byte_stream, ws): type = self.getDataType() if type == IU: pbo = ipaaca_pb2.IU() pbo.ParseFromString( str(byte_stream) ) if pbo.access_mode == ipaaca_pb2.IU.PUSH: _payload = {} for entry in pbo.payload: k, v = unpack_typed_payload_item(entry) _payload[k] = v remote_push_iu = RemotePushIU( uid=pbo.uid, revision=pbo.revision, read_only = pbo.read_only, owner_name = pbo.owner_name, category = pbo.category, payload_type = pbo.payload_type, committed = pbo.committed, payload=_payload ) return remote_push_iu else: raise Exception("We can only handle IUs with access mode 'PUSH' for now!") else: raise ValueError("Inacceptable dataType %s" % type) #}}} class IULinkUpdate(object):#{{{ def __init__(self, uid, revision, is_delta, writer_name="undef", new_links=None, links_to_remove=None): super(IULinkUpdate, self).__init__() self.uid = uid self.revision = revision self.writer_name = writer_name self.is_delta = is_delta self.new_links = collections.defaultdict(set) if new_links is None else collections.defaultdict(set, new_links) self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove) def __str__(self): s = 'PayloadUpdate(' + 'uid=' + self.uid + ', ' s += 'revision='+str(self.revision)+', ' s += 'writer_name='+str(self.writer_name)+', ' s += 'is_delta='+str(self.is_delta)+', ' s += 'new_links = '+str(self.new_links)+', ' s += 'links_to_remove = '+str(self.links_to_remove)+')' return s #}}} class IUPayloadUpdate(object):#{{{ def __init__(self, uid, revision, is_delta, writer_name="undef", new_items=None, keys_to_remove=None): super(IUPayloadUpdate, self).__init__() self.uid = uid self.revision = revision self.writer_name = writer_name self.is_delta = is_delta self.new_items = {} if new_items is None else new_items self.keys_to_remove = [] if keys_to_remove is None else keys_to_remove def __str__(self): s = 'PayloadUpdate(' + 'uid=' + self.uid + ', ' s += 'revision='+str(self.revision)+', ' s += 'writer_name='+str(self.writer_name)+', ' s += 'is_delta='+str(self.is_delta)+', ' s += 'new_items = '+str(self.new_items)+', ' s += 'keys_to_remove = '+str(self.keys_to_remove)+')' return s #}}} class IULinkUpdateConverter(rsb.converter.Converter):#{{{ def __init__(self, wireSchema="ipaaca-iu-link-update", dataType=IULinkUpdate): super(IULinkUpdateConverter, self).__init__(bytearray, dataType, wireSchema) def serialize(self, iu_link_update): pbo = ipaaca_pb2.IULinkUpdate() pbo.uid = iu_link_update.uid pbo.writer_name = iu_link_update.writer_name pbo.revision = iu_link_update.revision for type_ in iu_link_update.new_links.keys(): linkset = pbo.new_links.add() linkset.type = type_ linkset.targets.extend(iu_link_update.new_links[type_]) for type_ in iu_link_update.links_to_remove.keys(): linkset = pbo.links_to_remove.add() linkset.type = type_ linkset.targets.extend(iu_link_update.links_to_remove[type_]) pbo.is_delta = iu_link_update.is_delta return bytearray(pbo.SerializeToString()), self.wireSchema def deserialize(self, byte_stream, ws): type = self.getDataType() if type == IULinkUpdate: pbo = ipaaca_pb2.IULinkUpdate() pbo.ParseFromString( str(byte_stream) ) logger.debug('received an IULinkUpdate for revision '+str(pbo.revision)) iu_link_up = IULinkUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta) for entry in pbo.new_links: iu_link_up.new_links[str(entry.type)] = set(entry.targets) for entry in pbo.links_to_remove: iu_link_up.links_to_remove[str(entry.type)] = set(entry.targets) return iu_link_up else: raise ValueError("Inacceptable dataType %s" % type) #}}} class IUPayloadUpdateConverter(rsb.converter.Converter):#{{{ def __init__(self, wireSchema="ipaaca-iu-payload-update", dataType=IUPayloadUpdate): super(IUPayloadUpdateConverter, self).__init__(bytearray, dataType, wireSchema) def serialize(self, iu_payload_update): pbo = ipaaca_pb2.IUPayloadUpdate() pbo.uid = iu_payload_update.uid pbo.writer_name = iu_payload_update.writer_name pbo.revision = iu_payload_update.revision for k,v in iu_payload_update.new_items.items(): entry = pbo.new_items.add() pack_typed_payload_item(entry, k, v) pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove) pbo.is_delta = iu_payload_update.is_delta return bytearray(pbo.SerializeToString()), self.wireSchema def deserialize(self, byte_stream, ws): type = self.getDataType() if type == IUPayloadUpdate: pbo = ipaaca_pb2.IUPayloadUpdate() pbo.ParseFromString( str(byte_stream) ) logger.debug('received an IUPayloadUpdate for revision '+str(pbo.revision)) iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta) for entry in pbo.new_items: k, v = unpack_typed_payload_item(entry) iu_up.new_items[k] = v iu_up.keys_to_remove = pbo.keys_to_remove[:] return iu_up else: raise ValueError("Inacceptable dataType %s" % type) #}}} class IUStore(dict): """A dictionary storing IUs.""" def __init__(self): super(IUStore, self).__init__() class IUEventHandler(object): """Wrapper for IU event handling functions.""" def __init__(self, handler_function, for_event_types=None, for_categories=None): """Create an IUEventHandler. Keyword arguments: handler_function -- the handler function with the signature (IU, event_type, local) for_event_types -- a list of event types or None if handler should be called for all event types for_categories -- a list of category names or None if handler should be called for all categoires """ super(IUEventHandler, self).__init__() self._handler_function = handler_function self._for_event_types = ( None if for_event_types is None else (for_event_types[:] if hasattr(for_event_types, '__iter__') else [for_event_types])) self._for_categories = ( None if for_categories is None else (for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories])) def condition_met(self, event_type, category): """Check whether this IUEventHandler should be called. Keyword arguments: event_type -- type of the IU event category -- category of the IU which triggered the event """ type_condition_met = (self._for_event_types is None or event_type in self._for_event_types) cat_condition_met = (self._for_categories is None or category in self._for_categories) return type_condition_met and cat_condition_met def call(self, buffer, iu_uid, local, event_type, category): """Call this IUEventHandler's function, if it applies. Keyword arguments: buffer -- the buffer in which the IU is stored iu_uid -- the uid of the IU local -- is the IU local or remote to this component? @RAMIN: Is this correct? event_type -- IU event type category -- category of the IU """ if self.condition_met(event_type, category): iu = buffer._iu_store[iu_uid] self._handler_function(iu, event_type, local) class Buffer(object): """Base class for InputBuffer and OutputBuffer.""" def __init__(self, owning_component_name, participant_config=None): '''Create a Buffer. Keyword arguments: owning_compontent_name -- name of the entity that owns this Buffer participant_config -- RSB configuration ''' super(Buffer, self).__init__() self._owning_component_name = owning_component_name self._participant_config = participant_config #rsb.ParticipantConfig.fromDefaultSources() if participant_config is None else participant_config self._uuid = str(uuid.uuid4())[0:8] # Initialise with a temporary, but already unique, name self._unique_name = "undef-"+self._uuid self._iu_store = IUStore() self._iu_event_handlers = [] def register_handler(self, handler_function, for_event_types=None, for_categories=None): """Register a new IU event handler function. Keyword arguments: handler_function -- a function with the signature (IU, event_type, local) for_event_types -- a list of event types or None if handler should be called for all event types for_categories -- a list of category names or None if handler should be called for all categoires """ handler = IUEventHandler(handler_function=handler_function, for_event_types=for_event_types, for_categories=for_categories) self._iu_event_handlers.append(handler) def call_iu_event_handlers(self, uid, local, event_type, category): """Call registered IU event handler functions registered for this event_type and category.""" for h in self._iu_event_handlers: # print('calling an update handler for '+event_type+' -> '+str(h)) h.call(self, uid, local=local, event_type=event_type, category=category) def _get_owning_component_name(self): """Return the name of this Buffer's owning component""" return self._owning_component_name owning_component_name = property(_get_owning_component_name) def _get_unique_name(self): """Return the Buffer's unique name.""" return self._unique_name unique_name = property(_get_unique_name) class InputBuffer(Buffer): """An InputBuffer that holds remote IUs.""" def __init__(self, owning_component_name, category_interests=None, participant_config=None): '''Create an InputBuffer. Keyword arguments: owning_compontent_name -- name of the entity that owns this InputBuffer category_interests -- list of IU categories this Buffer is interested in participant_config = RSB configuration ''' super(InputBuffer, self).__init__(owning_component_name, participant_config) self._unique_name = '/ipaaca/component/'+str(owning_component_name)+'ID'+self._uuid+'/IB' self._listener_store = {} # one per IU category self._remote_server_store = {} # one per remote-IU-owning Component self._category_interests = [] if category_interests is not None: for cat in category_interests: self._create_category_listener_if_needed(cat) def _get_remote_server(self, iu): '''Return (or create, store and return) a remote server.''' if iu.owner_name in self._remote_server_store: return self._remote_server_store[iu.owner_name] # TODO remove the str() when unicode is supported (issue #490) remote_server = rsb.createRemoteServer(rsb.Scope(str(iu.owner_name))) self._remote_server_store[iu.owner_name] = remote_server return remote_server def _create_category_listener_if_needed(self, iu_category): '''Return (or create, store and return) a category listener.''' if iu_category in self._listener_store: return self._informer_store[iu_category] cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config) cat_listener.addHandler(self._handle_iu_events) self._listener_store[iu_category] = cat_listener self._category_interests.append(iu_category) logger.info("Added category listener for "+iu_category) return cat_listener def _handle_iu_events(self, event): '''Dispatch incoming IU events. Adds incoming IU's to the store, applies payload and commit updates to IU, calls IU event handlers.' Keyword arguments: event -- a converted RSB event ''' type_ = type(event.data) if type_ is RemotePushIU: # a new IU if event.data.uid in self._iu_store: # already in our store pass else: self._iu_store[ event.data.uid ] = event.data event.data.buffer = self self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.ADDED, category=event.data.category) else: # an update to an existing IU if event.data.writer_name == self.unique_name: # Discard updates that originate from this buffer return if event.data.uid not in self._iu_store: # TODO: we should request the IU's owner to send us the IU logger.warning("Update message for IU which we did not fully receive before.") return if type_ is ipaaca_pb2.IUCommission: # IU commit iu = self._iu_store[event.data.uid] iu._apply_commission() iu._revision = event.data.revision self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category) elif type_ is IUPayloadUpdate: # IU payload update iu = self._iu_store[event.data.uid] iu._apply_update(event.data) self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category) elif type_ is IULinkUpdate: # IU link update iu = self._iu_store[event.data.uid] iu._apply_link_update(event.data) self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.LINKSUPDATED, category=iu.category) else: logger.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_)) class OutputBuffer(Buffer): """An OutputBuffer that holds local IUs.""" def __init__(self, owning_component_name, participant_config=None): '''Create an Output Buffer. Keyword arguments: owning_component_name -- name of the entity that own this buffer participant_config -- RSB configuration ''' super(OutputBuffer, self).__init__(owning_component_name, participant_config) self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB' self._server = rsb.createServer(rsb.Scope(self._unique_name)) self._server.addMethod('updatePayload', self._remote_update_payload, IUPayloadUpdate, int) self._server.addMethod('commit', self._remote_commit, ipaaca_pb2.IUCommission, int) self._informer_store = {} self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-' self.__iu_id_counter_lock = threading.Lock() self.__iu_id_counter = 0 def _create_own_name_listener(self, iu_category): # FIXME replace this '''Create an own name listener.''' #if iu_category in self._listener_store: return self._informer_store[iu_category] #cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config) #cat_listener.addHandler(self._handle_iu_events) #self._listener_store[iu_category] = cat_listener #self._category_interests.append(iu_category) #logger.info("Added category listener for "+iu_category) #return cat_listener def _generate_iu_uid(self): '''Generate a unique IU id of the form''' with self.__iu_id_counter_lock: self.__iu_id_counter += 1 number = self.__iu_id_counter return self._id_prefix + str(number) def _remote_update_payload(self, update): '''Apply a remotely requested update to one of the stored IUs.''' if update.uid not in self._iu_store: logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid)) return 0 iu = self._iu_store[update.uid] with iu.revision_lock: if (update.revision != 0) and (update.revision != iu.revision): # (0 means "do not pay attention to the revision number" -> "force update") logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid)) return 0 if update.is_delta: for k in update.keys_to_remove: iu.payload.__delitem__(k, writer_name=update.writer_name) for k,v in update.new_items.items(): iu.payload.__setitem__(k, v, writer_name=update.writer_name) else: iu._set_payload(update.new_items, writer_name=update.writer_name) # _set_payload etc. have also incremented the revision number self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category) return iu.revision def _remote_commit(self, iu_commission): '''Apply a remotely requested commit to one of the stored IUs.''' if iu_commission.uid not in self._iu_store: logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid)) return 0 iu = self._iu_store[iu_commission.uid] with iu.revision_lock: if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision): # (0 means "do not pay attention to the revision number" -> "force update") logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid)) return 0 if iu.committed: return 0 else: iu._internal_commit(writer_name=iu_commission.writer_name) self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=IUEventType.COMMITTED, category=iu.category) return iu.revision def _get_informer(self, iu_category): '''Return (or create, store and return) an informer object for IUs of the specified category.''' if iu_category in self._informer_store: return self._informer_store[iu_category] informer_iu = rsb.createInformer( rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config, dataType=object) self._informer_store[iu_category] = informer_iu #new_tuple logger.info("Added informer on "+iu_category) return informer_iu #return new_tuple def add(self, iu): '''Add an IU to the IU store, assign an ID and publish it.''' if iu._uid is not None: raise IUPublishedError(iu) iu.uid = self._generate_iu_uid() self._iu_store[iu._uid] = iu iu.buffer = self self._publish_iu(iu) def _publish_iu(self, iu): '''Publish an IU.''' informer = self._get_informer(iu._category) informer.publishData(iu) def _send_iu_commission(self, iu, writer_name): '''Send IU commission. Keyword arguments: iu -- the IU that has been committed to writer_name -- name of the Buffer that initiated this commit, necessary to enable remote components to filter out updates that originated from their own operations ''' # a raw Protobuf object for IUCommission is produced # (unlike updates, where we have an intermediate class) iu_commission = ipaaca_pb2.IUCommission() iu_commission.uid = iu.uid iu_commission.revision = iu.revision iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name # print('sending IU commission event') informer = self._get_informer(iu._category) informer.publishData(iu_commission) def _send_iu_link_update(self, iu, is_delta, revision, new_links=None, links_to_remove=None, writer_name="undef"): '''Send an IU link update. Keyword arguments: iu -- the IU being updated is_delta -- whether this is an incremental update or a replacement the whole link dictionary revision -- the new revision number new_links -- a dictionary of new link sets links_to_remove -- a dict of the link sets that shall be removed writer_name -- name of the Buffer that initiated this update, necessary to enable remote components to filter out updates that originate d from their own operations ''' if new_links is None: new_links = {} if links_to_remove is None: links_to_remove = {} link_update = IULinkUpdate(iu._uid, is_delta=is_delta, revision=revision) link_update.new_links = new_links if is_delta: link_update.links_to_remove = links_to_remove link_update.writer_name = writer_name informer = self._get_informer(iu._category) informer.publishData(link_update) # FIXME send the notification to the target, if the target is not the writer_name def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"): '''Send an IU payload update. Keyword arguments: iu -- the IU being updated is_delta -- whether this is an incremental update or a replacement revision -- the new revision number new_items -- a dictionary of new payload items keys_to_remove -- a list of the keys that shall be removed from the payload writer_name -- name of the Buffer that initiated this update, necessary to enable remote components to filter out updates that originate d from their own operations ''' if new_items is None: new_items = {} if keys_to_remove is None: keys_to_remove = [] payload_update = IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision) payload_update.new_items = new_items if is_delta: payload_update.keys_to_remove = keys_to_remove payload_update.writer_name = writer_name informer = self._get_informer(iu._category) informer.publishData(payload_update) ## --- RSB ------------------------------------------------------------------- def initialize_ipaaca_rsb():#{{{ rsb.converter.registerGlobalConverter( IntConverter(wireSchema="int32", dataType=int)) rsb.converter.registerGlobalConverter( IUConverter(wireSchema="ipaaca-iu", dataType=IU)) rsb.converter.registerGlobalConverter( IULinkUpdateConverter( wireSchema="ipaaca-iu-link-update", dataType=IULinkUpdate)) rsb.converter.registerGlobalConverter( IUPayloadUpdateConverter( wireSchema="ipaaca-iu-payload-update", dataType=IUPayloadUpdate)) rsb.converter.registerGlobalConverter( rsb.converter.ProtocolBufferConverter( messageClass=ipaaca_pb2.IUCommission)) #rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources() #t = rsb.ParticipantConfig.Transport('spread', {'enabled':'true'}) rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromFile('rsb.cfg') #}}} ## --- Module initialisation ------------------------------------------------- # register our own RSB Converters initialize_ipaaca_rsb() # Create a global logger for this module logger = logging.getLogger('ipaaca') logger.addHandler(IpaacaLoggingHandler())