diff --git a/ipaacalib/proto/ipaaca.proto b/ipaacalib/proto/ipaaca.proto index 7288dc84a0f8dcef4689166b26571c2d936c6200..ffa5dc8beeced25fd43107d4f8e6e428b39e9223 100755 --- a/ipaacalib/proto/ipaaca.proto +++ b/ipaacalib/proto/ipaaca.proto @@ -84,6 +84,11 @@ message IUCommission { required string writer_name = 3; } +message IUResendRequest { + required string uid = 1; + required string hidden_name = 2; +} + message IULinkUpdate { required string uid = 1; required uint32 revision = 2; diff --git a/ipaacalib/python/src/ipaaca/__init__.py b/ipaacalib/python/src/ipaaca/__init__.py index 0bd8bbb2c5c891b00eff0d30ef746c30e0cd4201..5e1298a1fdc06161e98c48f37a9dd0d8b416284d 100755 --- a/ipaacalib/python/src/ipaaca/__init__.py +++ b/ipaacalib/python/src/ipaaca/__init__.py @@ -2,10 +2,10 @@ # This file is part of IPAACA, the # "Incremental Processing Architecture -# for Artificial Conversational Agents". +# for Artificial Conversational Agents". # # Copyright (c) 2009-2013 Sociable Agents Group -# CITEC, Bielefeld University +# CITEC, Bielefeld University # # http://opensource.cit-ec.de/projects/ipaaca/ # http://purl.org/net/ipaaca @@ -22,7 +22,7 @@ # You should have received a copy of the LGPL along with this # program. If not, go to http://www.gnu.org/licenses/lgpl.html # or write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # # The development of this software was supported by the # Excellence Cluster EXC 277 Cognitive Interaction Technology. @@ -65,7 +65,7 @@ __all__ = [ 'IUAccessMode', 'InputBuffer', 'OutputBuffer', 'IU', - 'IUPublishedError', 'IUUpdateFailedError', 'IUCommittedError', 'IUReadOnlyError', 'IUNotFoundError', + 'IUPublishedError', 'IUUpdateFailedError', 'IUCommittedError', 'IUReadOnlyError', 'IUNotFoundError', 'IUResendFailedError', 'logger' ] @@ -75,7 +75,7 @@ __all__ = [ def enum(*sequential, **named): """Create an enum type. - + Based on suggestion of Alec Thomas on stackoverflow.com: http://stackoverflow.com/questions/36932/ whats-the-best-way-to-implement-an-enum-in-python/1695250#1695250 @@ -96,12 +96,12 @@ def unpack_typed_payload_item(protobuf_object): class IpaacaLoggingHandler(logging.Handler): - + def __init__(self, level=logging.DEBUG): logging.Handler.__init__(self, level) - + def emit(self, record): - meta = '[ipaaca] (' + str(record.levelname) + ') ' + meta = '[ipaaca] (' + str(record.levelname) + ') ' msg = str(record.msg.format(record.args)) print(meta + msg) @@ -141,6 +141,10 @@ class IUUpdateFailedError(Exception): def __init__(self, iu): super(IUUpdateFailedError, self).__init__('Remote update failed for IU ' + str(iu.uid) + '.') +class IUResendFailedError(Exception): + """Error indicating that a remote IU resend failed.""" + def __init__(self, iu): + super(IUResendFailedError, self).__init__('Remote resend failed for IU ' + str(iu.uid) + '.') class IUCommittedError(Exception): """Error indicating that an IU is immutable because it has been committed to.""" @@ -253,13 +257,13 @@ class Payload(dict): #print("Payload.__delitem__() OUT") self._batch_update_lock.release() return r - - # Context-manager based batch updates, not yet thread-safe (on remote updates) + + # Context-manager based batch updates, not yet thread-safe (on remote updates) def __enter__(self): #print('running Payload.__enter__()') self._wait_batch_update_lock(self._update_timeout) self._update_on_every_change = False - + def __exit__(self, type, value, traceback): #print('running Payload.__exit__()') self.iu._modify_payload(is_delta=True, new_items=self._collected_modifications, keys_to_remove=self._collected_removals, writer_name=self._batch_update_writer_name) @@ -291,12 +295,12 @@ class Payload(dict): class IUInterface(object): #{{{ - + """Base class of all specialised IU classes.""" - + def __init__(self, uid, access_mode=IUAccessMode.PUSH, read_only=False): """Creates an IU. - + Keyword arguments: uid -- unique ID of this IU access_mode -- access mode of this IU @@ -314,7 +318,7 @@ class IUInterface(object): #{{{ self._buffer = None # payload is not present here self._links = collections.defaultdict(set) - + def __str__(self): s = unicode(self.__class__)+"{ " s += "category="+("<None>" if self._category is None else self._category)+" " @@ -331,8 +335,8 @@ class IUInterface(object): #{{{ s += "} " s += "}" return s - - + + def _add_and_remove_links(self, add, remove): '''Just add and remove the new links in our links set, do not send an update here''' '''Note: Also used for remotely enforced links updates.''' @@ -343,7 +347,7 @@ class IUInterface(object): #{{{ '''Note: Also used for remotely enforced links updates.''' self._links = collections.defaultdict(set) for type in links.keys(): self._links[type] |= set(links[type]) - + def add_links(self, type, targets, writer_name=None): '''Attempt to add links if the conditions are met and send an update message. Then call the local setter.''' @@ -370,45 +374,45 @@ class IUInterface(object): #{{{ return set(self._links[type]) def get_all_links(self): return copy.deepcopy(self._links) - + def _get_revision(self): return self._revision revision = property(fget=_get_revision, doc='Revision number of the IU.') - + def _get_category(self): return self._category category = property(fget=_get_category, doc='Category of the IU.') - + def _get_payload_type(self): return self._payload_type payload_type = property(fget=_get_payload_type, doc='Type of the IU payload') - + def _get_committed(self): return self._committed committed = property( fget=_get_committed, doc='Flag indicating whether this IU has been committed to.') - + def _get_retracted(self): return self._retracted retracted = property( fget=_get_retracted, doc='Flag indicating whether this IU has been retracted.') - + def _get_uid(self): return self._uid uid = property(fget=_get_uid, doc='Unique ID of the IU.') - + def _get_access_mode(self): return self._access_mode access_mode = property(fget=_get_access_mode, doc='Access mode of the IU.') - + def _get_read_only(self): return self._read_only read_only = property( - fget=_get_read_only, + fget=_get_read_only, doc='Flag indicating whether this IU is read only.') - + def _get_buffer(self): return self._buffer def _set_buffer(self, buffer): @@ -417,9 +421,9 @@ class IUInterface(object): #{{{ self._buffer = buffer buffer = property( fget=_get_buffer, - fset=_set_buffer, + fset=_set_buffer, doc='Buffer this IU is held in.') - + def _get_owner_name(self): return self._owner_name def _set_owner_name(self, owner_name): @@ -444,7 +448,7 @@ class IU(IUInterface):#{{{ self._payload_type = _payload_type self.revision_lock = threading.RLock() self._payload = Payload(iu=self) - + def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None): if self.committed: raise IUCommittedError(self) @@ -460,7 +464,7 @@ class IU(IUInterface):#{{{ new_links=new_links, links_to_remove=links_to_remove, writer_name=self.owner_name if writer_name is None else writer_name) - + def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): """Modify the payload: add or remove items from this payload locally and send update.""" if self.committed: @@ -479,10 +483,10 @@ class IU(IUInterface):#{{{ new_items=new_items, keys_to_remove=keys_to_remove, writer_name=self.owner_name if writer_name is None else writer_name) - + def _increase_revision_number(self): self._revision += 1 - + def _internal_commit(self, writer_name=None): if self.committed: raise IUCommittedError(self) @@ -492,11 +496,11 @@ class IU(IUInterface):#{{{ self._committed = True if self.buffer is not None: self.buffer._send_iu_commission(self, writer_name=writer_name) - + def commit(self): """Commit to this IU.""" return self._internal_commit() - + def _get_payload(self): return self._payload def _set_payload(self, new_pl, writer_name=None): @@ -512,13 +516,13 @@ class IU(IUInterface):#{{{ fget=_get_payload, fset=_set_payload, doc='Payload dictionary of this IU.') - + def _get_is_published(self): return self.buffer is not None is_published = property( - fget=_get_is_published, + fget=_get_is_published, doc='Flag indicating whether this IU has been published or not.') - + def _set_buffer(self, buffer): if self._buffer is not None: raise Exception('The IU is already in a buffer, cannot move it.') @@ -529,7 +533,7 @@ class IU(IUInterface):#{{{ fget=IUInterface._get_buffer, fset=_set_buffer, doc='Buffer this IU is held in.') - + def _set_uid(self, uid): if self._uid is not None: raise AttributeError('The uid of IU ' + self.uid + ' has already been set, cannot change it.') @@ -545,25 +549,25 @@ class Message(IU):#{{{ """Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls.""" def __init__(self, category='undef', access_mode=IUAccessMode.MESSAGE, read_only=True, _payload_type='MAP'): super(Message, self).__init__(category=category, access_mode=access_mode, read_only=read_only, _payload_type=_payload_type) - + def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None): if self.is_published: logger.info('Info: modifying a Message after sending has no global effects') - + def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): if self.is_published: logger.info('Info: modifying a Message after sending has no global effects') - + def _increase_revision_number(self): self._revision += 1 - + def _internal_commit(self, writer_name=None): if self.is_published: logger.info('Info: committing to a Message after sending has no global effects') - + def commit(self): return self._internal_commit() - + def _get_payload(self): return self._payload def _set_payload(self, new_pl, writer_name=None): @@ -582,13 +586,13 @@ class Message(IU):#{{{ fget=_get_payload, fset=_set_payload, doc='Payload dictionary of this IU.') - + def _get_is_published(self): return self.buffer is not None is_published = property( - fget=_get_is_published, + fget=_get_is_published, doc='Flag indicating whether this IU has been published or not.') - + def _set_buffer(self, buffer): if self._buffer is not None: raise Exception('The IU is already in a buffer, cannot move it.') @@ -599,7 +603,7 @@ class Message(IU):#{{{ fget=IUInterface._get_buffer, fset=_set_buffer, doc='Buffer this IU is held in.') - + def _set_uid(self, uid): if self._uid is not None: raise AttributeError('The uid of IU ' + self.uid + ' has already been set, cannot change it.') @@ -611,9 +615,9 @@ class Message(IU):#{{{ #}}} class RemoteMessage(IUInterface):#{{{ - + """A remote IU with access mode 'MESSAGE'.""" - + def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links): super(RemoteMessage, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only) self._revision = revision @@ -627,10 +631,10 @@ class RemoteMessage(IUInterface):#{{{ # We are just receiving it here and applying the new data. self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True) self._links = links - + def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None): logger.info('Info: modifying a RemoteMessage only has local effects') - + def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): logger.info('Info: modifying a RemoteMessage only has local effects') @@ -655,7 +659,7 @@ class RemoteMessage(IUInterface):#{{{ self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove) else: self._replace_links(links=update.new_links) - + def _apply_update(self, update): """Apply a IUPayloadUpdate to the IU.""" logger.warning('Warning: should never be called: RemoteMessage._apply_update') @@ -666,12 +670,12 @@ class RemoteMessage(IUInterface):#{{{ else: # NOTE Please read the comment in the constructor self._payload = Payload(iu=self, new_payload=update.new_items, omit_init_update_message=True) - + def _apply_commission(self): """Apply commission to the IU""" logger.warning('Warning: should never be called: RemoteMessage._apply_commission') self._committed = True - + def _apply_retraction(self): """Apply retraction to the IU""" logger.warning('Warning: should never be called: RemoteMessage._apply_retraction') @@ -679,9 +683,9 @@ class RemoteMessage(IUInterface):#{{{ #}}} class RemotePushIU(IUInterface):#{{{ - + """A remote IU with access mode 'PUSH'.""" - + def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links): super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only) self._revision = revision @@ -695,7 +699,7 @@ class RemotePushIU(IUInterface):#{{{ # We are just receiving it here and applying the new data. self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True) self._links = links - + def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None): """Modify the links: add or remove item from this payload remotely and send update.""" if self.committed: @@ -715,7 +719,7 @@ class RemotePushIU(IUInterface):#{{{ raise IUUpdateFailedError(self) else: self._revision = new_revision - + def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): """Modify the payload: add or remove item from this payload remotely and send update.""" if self.committed: @@ -790,7 +794,7 @@ class RemotePushIU(IUInterface):#{{{ self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove) else: self._replace_links(links=update.new_links) - + def _apply_update(self, update): """Apply a IUPayloadUpdate to the IU.""" self._revision = update.revision @@ -800,11 +804,11 @@ class RemotePushIU(IUInterface):#{{{ else: # NOTE Please read the comment in the constructor self._payload = Payload(iu=self, new_payload=update.new_items, omit_init_update_message=True) - + def _apply_commission(self): """Apply commission to the IU""" self._committed = True - + def _apply_retraction(self): """Apply retraction to the IU""" self._retracted = True @@ -815,12 +819,12 @@ class IntConverter(rsb.converter.Converter):#{{{ """Convert Python int objects to Protobuf ints and vice versa.""" def __init__(self, wireSchema="int", dataType=int): super(IntConverter, self).__init__(bytearray, dataType, wireSchema) - + def serialize(self, value): pbo = ipaaca_pb2.IntMessage() pbo.value = value return bytearray(pbo.SerializeToString()), self.wireSchema - + def deserialize(self, byte_stream, ws): pbo = ipaaca_pb2.IntMessage() pbo.ParseFromString( str(byte_stream) ) @@ -835,7 +839,7 @@ class IUConverter(rsb.converter.Converter):#{{{ ''' def __init__(self, wireSchema="ipaaca-iu", dataType=IU): super(IUConverter, self).__init__(bytearray, dataType, wireSchema) - + def serialize(self, iu): pbo = ipaaca_pb2.IU() pbo.uid = iu._uid @@ -859,7 +863,7 @@ class IUConverter(rsb.converter.Converter):#{{{ linkset.targets.extend(iu._links[type_]) ws = "ipaaca-messageiu" if iu._access_mode == IUAccessMode.MESSAGE else self.wireSchema return bytearray(pbo.SerializeToString()), ws - + def deserialize(self, byte_stream, ws): type = self.getDataType() #print('IUConverter.deserialize got a '+str(type)+' over wireSchema '+ws) @@ -921,7 +925,7 @@ class MessageConverter(rsb.converter.Converter):#{{{ ''' def __init__(self, wireSchema="ipaaca-messageiu", dataType=Message): super(IUConverter, self).__init__(bytearray, dataType, wireSchema) - + def serialize(self, iu): pbo = ipaaca_pb2.IU() pbo.uid = iu._uid @@ -945,7 +949,7 @@ class MessageConverter(rsb.converter.Converter):#{{{ linkset.targets.extend(iu._links[type_]) ws = "ipaaca-messageiu" if iu._access_mode == IUAccessMode.MESSAGE else self.wireSchema return bytearray(pbo.SerializeToString()), ws - + def deserialize(self, byte_stream, ws): type = self.getDataType() #print('MessageConverter.deserialize got a '+str(type)+' over wireSchema '+ws) @@ -1002,7 +1006,7 @@ class MessageConverter(rsb.converter.Converter):#{{{ class IULinkUpdate(object):#{{{ - + def __init__(self, uid, revision, is_delta, writer_name="undef", new_links=None, links_to_remove=None): super(IULinkUpdate, self).__init__() self.uid = uid @@ -1011,7 +1015,7 @@ class IULinkUpdate(object):#{{{ self.is_delta = is_delta self.new_links = collections.defaultdict(set) if new_links is None else collections.defaultdict(set, new_links) self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove) - + def __str__(self): s = 'LinkUpdate(' + 'uid=' + self.uid + ', ' s += 'revision='+str(self.revision)+', ' @@ -1023,7 +1027,7 @@ class IULinkUpdate(object):#{{{ #}}} class IUPayloadUpdate(object):#{{{ - + def __init__(self, uid, revision, is_delta, writer_name="undef", new_items=None, keys_to_remove=None): super(IUPayloadUpdate, self).__init__() self.uid = uid @@ -1032,7 +1036,7 @@ class IUPayloadUpdate(object):#{{{ self.is_delta = is_delta self.new_items = {} if new_items is None else new_items self.keys_to_remove = [] if keys_to_remove is None else keys_to_remove - + def __str__(self): s = 'PayloadUpdate(' + 'uid=' + self.uid + ', ' s += 'revision='+str(self.revision)+', ' @@ -1046,7 +1050,7 @@ class IUPayloadUpdate(object):#{{{ class IULinkUpdateConverter(rsb.converter.Converter):#{{{ def __init__(self, wireSchema="ipaaca-iu-link-update", dataType=IULinkUpdate): super(IULinkUpdateConverter, self).__init__(bytearray, dataType, wireSchema) - + def serialize(self, iu_link_update): pbo = ipaaca_pb2.IULinkUpdate() pbo.uid = iu_link_update.uid @@ -1062,7 +1066,7 @@ class IULinkUpdateConverter(rsb.converter.Converter):#{{{ linkset.targets.extend(iu_link_update.links_to_remove[type_]) pbo.is_delta = iu_link_update.is_delta return bytearray(pbo.SerializeToString()), self.wireSchema - + def deserialize(self, byte_stream, ws): type = self.getDataType() if type == IULinkUpdate: @@ -1082,7 +1086,7 @@ class IULinkUpdateConverter(rsb.converter.Converter):#{{{ class IUPayloadUpdateConverter(rsb.converter.Converter):#{{{ def __init__(self, wireSchema="ipaaca-iu-payload-update", dataType=IUPayloadUpdate): super(IUPayloadUpdateConverter, self).__init__(bytearray, dataType, wireSchema) - + def serialize(self, iu_payload_update): pbo = ipaaca_pb2.IUPayloadUpdate() pbo.uid = iu_payload_update.uid @@ -1094,7 +1098,7 @@ class IUPayloadUpdateConverter(rsb.converter.Converter):#{{{ pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove) pbo.is_delta = iu_payload_update.is_delta return bytearray(pbo.SerializeToString()), self.wireSchema - + def deserialize(self, byte_stream, ws): type = self.getDataType() if type == IUPayloadUpdate: @@ -1128,12 +1132,12 @@ class FrozenIUStore(IUStore): raise AttributeError() class IUEventHandler(object): - + """Wrapper for IU event handling functions.""" - + def __init__(self, handler_function, for_event_types=None, for_categories=None): """Create an IUEventHandler. - + Keyword arguments: handler_function -- the handler function with the signature (IU, event_type, local) @@ -1150,10 +1154,10 @@ class IUEventHandler(object): self._for_categories = ( None if for_categories is None else (for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories])) - + def condition_met(self, event_type, category): """Check whether this IUEventHandler should be called. - + Keyword arguments: event_type -- type of the IU event category -- category of the IU which triggered the event @@ -1161,10 +1165,10 @@ class IUEventHandler(object): type_condition_met = (self._for_event_types is None or event_type in self._for_event_types) cat_condition_met = (self._for_categories is None or category in self._for_categories) return type_condition_met and cat_condition_met - + def call(self, buffer, iu_uid, local, event_type, category): """Call this IUEventHandler's function, if it applies. - + Keyword arguments: buffer -- the buffer in which the IU is stored iu_uid -- the uid of the IU @@ -1178,12 +1182,12 @@ class IUEventHandler(object): class Buffer(object): - + """Base class for InputBuffer and OutputBuffer.""" - + def __init__(self, owning_component_name, participant_config=None): '''Create a Buffer. - + Keyword arguments: owning_compontent_name -- name of the entity that owns this Buffer participant_config -- RSB configuration @@ -1196,49 +1200,49 @@ class Buffer(object): self._unique_name = "undef-"+self._uuid self._iu_store = IUStore() self._iu_event_handlers = [] - + def _get_frozen_iu_store(self): return FrozenIUStore(original_iu_store = self._iu_store) iu_store = property(fget=_get_frozen_iu_store, doc='Copy-on-read version of the internal IU store') - + def register_handler(self, handler_function, for_event_types=None, for_categories=None): """Register a new IU event handler function. - + Keyword arguments: handler_function -- a function with the signature (IU, event_type, local) for_event_types -- a list of event types or None if handler should be called for all event types for_categories -- a list of category names or None if handler should be called for all categories - + """ handler = IUEventHandler(handler_function=handler_function, for_event_types=for_event_types, for_categories=for_categories) self._iu_event_handlers.append(handler) - + def call_iu_event_handlers(self, uid, local, event_type, category): """Call registered IU event handler functions registered for this event_type and category.""" for h in self._iu_event_handlers: h.call(self, uid, local=local, event_type=event_type, category=category) - + def _get_owning_component_name(self): """Return the name of this Buffer's owning component""" return self._owning_component_name owning_component_name = property(_get_owning_component_name) - + def _get_unique_name(self): """Return the Buffer's unique name.""" return self._unique_name unique_name = property(_get_unique_name) - + class InputBuffer(Buffer): - + """An InputBuffer that holds remote IUs.""" - + def __init__(self, owning_component_name, category_interests=None, participant_config=None): '''Create an InputBuffer. - + Keyword arguments: owning_compontent_name -- name of the entity that owns this InputBuffer category_interests -- list of IU categories this Buffer is interested in @@ -1252,31 +1256,39 @@ class InputBuffer(Buffer): if category_interests is not None: for cat in category_interests: self._add_category_listener(cat) - + # add own uuid as identifier for hidden channel. (dlw) + self._add_category_listener(str(self._uuid)) + def _get_remote_server(self, iu): '''Return (or create, store and return) a remote server.''' - if iu.owner_name in self._remote_server_store: - return self._remote_server_store[iu.owner_name] - # TODO remove the str() when unicode is supported (issue #490) - remote_server = rsb.createRemoteServer(rsb.Scope(str(iu.owner_name))) - self._remote_server_store[iu.owner_name] = remote_server + _owner = None + if hasattr(iu,'owner_name'): + _owner = iu.owner_name + elif hasattr(iu,'writer_name'): + _owner = iu.writer_name + if _owner is not None: + if _owner in self._remote_server_store: + return self._remote_server_store[_owner] + # TODO remove the str() when unicode is supported (issue #490) + remote_server = rsb.createRemoteServer(rsb.Scope(str(_owner))) + self._remote_server_store[_owner] = remote_server return remote_server - + def _add_category_listener(self, iu_category): '''Return (or create, store and return) a category listener.''' - if iu_category not in self._listener_store: + if iu_category not in self._listener_store: cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config) cat_listener.addHandler(self._handle_iu_events) self._listener_store[iu_category] = cat_listener self._category_interests.append(iu_category) - logger.info("Added listener in scope "+"/ipaaca/category/"+iu_category) - + logger.warning("Added listener in scope "+"/ipaaca/category/"+iu_category)#info + def _handle_iu_events(self, event): '''Dispatch incoming IU events. - + Adds incoming IU's to the store, applies payload and commit updates to IU, calls IU event handlers.' - + Keyword arguments: event -- a converted RSB event ''' @@ -1299,8 +1311,16 @@ class InputBuffer(Buffer): else: # an update to an existing IU if event.data.uid not in self._iu_store: - # TODO: we should request the IU's owner to send us the IU - logger.warning("Update message for IU which we did not fully receive before.") + logger.warning("Resend message for IU which we did not fully receive before.") + # send resend request to remote server (dlw). + remote_server = self._get_remote_server(event.data) + resend_request = ipaaca_pb2.IUResendRequest() + resend_request.uid = event.data.uid # target iu + resend_request.hidden_name = str(self._uuid) # hidden channel name + rRevision = remote_server.resendRequest(resend_request) + if rRevision == 0: + raise IUResendFailedError(self) + return if type_ is ipaaca_pb2.IURetraction: # IU retraction (cannot be triggered remotely) @@ -1319,7 +1339,7 @@ class InputBuffer(Buffer): return #else: # print('Got update written by buffer '+str(event.data.writer_name)) - + if type_ is ipaaca_pb2.IUCommission: # IU commit iu = self._iu_store[event.data.uid] @@ -1345,12 +1365,12 @@ class InputBuffer(Buffer): class OutputBuffer(Buffer): - + """An OutputBuffer that holds local IUs.""" - + def __init__(self, owning_component_name, participant_config=None): '''Create an Output Buffer. - + Keyword arguments: owning_component_name -- name of the entity that own this buffer participant_config -- RSB configuration @@ -1361,11 +1381,13 @@ class OutputBuffer(Buffer): self._server.addMethod('updateLinks', self._remote_update_links, IULinkUpdate, int) self._server.addMethod('updatePayload', self._remote_update_payload, IUPayloadUpdate, int) self._server.addMethod('commit', self._remote_commit, ipaaca_pb2.IUCommission, int) + # add method to trigger a resend request. (dlw) + self._server.addMethod('resendRequest', self._remote_resend_request, ipaaca_pb2.IUResendRequest, int) self._informer_store = {} self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-' self.__iu_id_counter_lock = threading.Lock() #self.__iu_id_counter = 0 # hbuschme: IUs now have their Ids assigned on creation - + def _create_own_name_listener(self, iu_category): # FIXME replace this '''Create an own name listener.''' @@ -1377,7 +1399,7 @@ class OutputBuffer(Buffer): #logger.info("Added category listener for "+iu_category) #return cat_listener pass - + # hbuschme: IUs now have their Ids assigned on creation #def _generate_iu_uid(self): # '''Generate a unique IU id of the form ????''' @@ -1403,7 +1425,7 @@ class OutputBuffer(Buffer): iu.set_links(links=update.new_links, writer_name=update.writer_name) self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.LINKSUPDATED, category=iu.category) return iu.revision - + def _remote_update_payload(self, update): '''Apply a remotely requested update to one of the stored IU's payload.''' if update.uid not in self._iu_store: @@ -1431,7 +1453,21 @@ class OutputBuffer(Buffer): # _set_payload etc. have also incremented the revision number self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category) return iu.revision - + + def _remote_resend_request(self, iu_resend_request_pack): + ''' Resend an requested iu. (dlw) ''' + if iu_resend_request_pack.uid not in self._iu_store: + logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_resend_request_pack.uid)) + return 0 + iu = self._iu_store[iu_resend_request_pack.uid] + with iu.revision_lock: + if (iu_resend_request_pack.hidden_name is not None) or (iu_resend_request_pack.hidden_name is not ""): + informer = self._get_informer(iu_resend_request_pack.hidden_name) + informer.publishData(iu) + return iu.revision + else: + return 0 + def _remote_commit(self, iu_commission): '''Apply a remotely requested commit to one of the stored IUs.''' if iu_commission.uid not in self._iu_store: @@ -1449,7 +1485,7 @@ class OutputBuffer(Buffer): iu._internal_commit(writer_name=iu_commission.writer_name) self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=IUEventType.COMMITTED, category=iu.category) return iu.revision - + def _get_informer(self, iu_category): '''Return (or create, store and return) an informer object for IUs of the specified category.''' if iu_category in self._informer_store: @@ -1462,7 +1498,7 @@ class OutputBuffer(Buffer): self._informer_store[iu_category] = informer_iu #new_tuple logger.info("Returning NEW informer on scope "+"/ipaaca/category/"+str(iu_category)) return informer_iu #return new_tuple - + def add(self, iu): '''Add an IU to the IU store, assign an ID and publish it.''' # hbuschme: IUs now have their Ids assigned on creation @@ -1478,7 +1514,7 @@ class OutputBuffer(Buffer): self._iu_store[iu.uid] = iu iu.buffer = self self._publish_iu(iu) - + def remove(self, iu=None, iu_uid=None): '''Remove the iu or an IU corresponding to iu_uid from the OutputBuffer, retracting it from the system.''' if iu is None: @@ -1492,12 +1528,12 @@ class OutputBuffer(Buffer): self._retract_iu(iu) del self._iu_store[iu.uid] return iu - + def _publish_iu(self, iu): '''Publish an IU.''' informer = self._get_informer(iu._category) informer.publishData(iu) - + def _retract_iu(self, iu): '''Retract (unpublish) an IU.''' iu_retraction = ipaaca_pb2.IURetraction() @@ -1505,10 +1541,10 @@ class OutputBuffer(Buffer): iu_retraction.revision = iu.revision informer = self._get_informer(iu._category) informer.publishData(iu_retraction) - + def _send_iu_commission(self, iu, writer_name): '''Send IU commission. - + Keyword arguments: iu -- the IU that has been committed to writer_name -- name of the Buffer that initiated this commit, necessary @@ -1523,10 +1559,10 @@ class OutputBuffer(Buffer): iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name informer = self._get_informer(iu._category) informer.publishData(iu_commission) - + def _send_iu_link_update(self, iu, is_delta, revision, new_links=None, links_to_remove=None, writer_name="undef"): '''Send an IU link update. - + Keyword arguments: iu -- the IU being updated is_delta -- whether this is an incremental update or a replacement @@ -1550,10 +1586,10 @@ class OutputBuffer(Buffer): informer = self._get_informer(iu._category) informer.publishData(link_update) # FIXME send the notification to the target, if the target is not the writer_name - + def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"): '''Send an IU payload update. - + Keyword arguments: iu -- the IU being updated is_delta -- whether this is an incremental update or a replacement @@ -1600,6 +1636,9 @@ def initialize_ipaaca_rsb():#{{{ rsb.converter.registerGlobalConverter( rsb.converter.ProtocolBufferConverter( messageClass=ipaaca_pb2.IUCommission)) + rsb.converter.registerGlobalConverter( + rsb.converter.ProtocolBufferConverter( + messageClass=ipaaca_pb2.IUResendRequest)) # dlw rsb.converter.registerGlobalConverter( rsb.converter.ProtocolBufferConverter( messageClass=ipaaca_pb2.IURetraction))