diff --git a/ipaacalib/python/src/ipaaca.py b/ipaacalib/python/src/ipaaca.py index 983c9eb05c44fa21013107ee0e219ad978284f55..40289b3c1b3de5ae9a879cf8a062257bb8a093d3 100755 --- a/ipaacalib/python/src/ipaaca.py +++ b/ipaacalib/python/src/ipaaca.py @@ -185,6 +185,7 @@ class IUInterface(object): #{{{ self._payload_type = None self._owner_name = None self._committed = False + self._retracted = False self._access_mode = access_mode self._read_only = read_only self._buffer = None @@ -264,6 +265,12 @@ class IUInterface(object): #{{{ fget=_get_committed, doc='Flag indicating whether this IU has been committed to.') + def _get_retracted(self): + return self._retracted + retracted = property( + fget=_get_retracted, + doc='Flag indicating whether this IU has been retracted.') + def _get_uid(self): return self._uid uid = property(fget=_get_uid, doc='Unique ID of the IU.') @@ -420,6 +427,7 @@ class RemotePushIU(IUInterface):#{{{ self.owner_name = owner_name self._payload_type = payload_type self._committed = committed + self._retracted = False # NOTE Since the payload is an already-existant Payload which we didn't modify ourselves, # don't try to invoke any modification checks or network updates ourselves either. # We are just receiving it here and applying the new data. @@ -534,6 +542,10 @@ class RemotePushIU(IUInterface):#{{{ def _apply_commission(self): """Apply commission to the IU""" self._committed = True + + def _apply_retraction(self): + """Apply retraction to the IU""" + self._retracted = True #}}} @@ -905,31 +917,43 @@ class InputBuffer(Buffer): self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.ADDED, category=event.data.category) else: # an update to an existing IU - if event.data.writer_name == self.unique_name: - # Discard updates that originate from this buffer - return if event.data.uid not in self._iu_store: # TODO: we should request the IU's owner to send us the IU logger.warning("Update message for IU which we did not fully receive before.") return - if type_ is ipaaca_pb2.IUCommission: - # IU commit + if type_ is ipaaca_pb2.IURetraction: + # IU retraction (cannot be triggered remotely) iu = self._iu_store[event.data.uid] - iu._apply_commission() iu._revision = event.data.revision - self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category) - elif type_ is IUPayloadUpdate: - # IU payload update - iu = self._iu_store[event.data.uid] - iu._apply_update(event.data) - self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category) - elif type_ is IULinkUpdate: - # IU link update - iu = self._iu_store[event.data.uid] - iu._apply_link_update(event.data) - self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.LINKSUPDATED, category=iu.category) + iu._apply_retraction() # for now - just sets the _rectracted flag. + self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.RETRACTED, category=iu.category) + # SPECIAL CASE: allow the handlers (which will need to find the IU + # in the buffer) to operate on the IU - then delete it afterwards! + # FIXME: for now: retracted == deleted! Think about this later + del(self._iu_store[iu.uid]) else: - logger.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_)) + if event.data.writer_name == self.unique_name: + # Notify only for remotely triggered events; + # Discard updates that originate from this buffer + return + if type_ is ipaaca_pb2.IUCommission: + # IU commit + iu = self._iu_store[event.data.uid] + iu._apply_commission() + iu._revision = event.data.revision + self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category) + elif type_ is IUPayloadUpdate: + # IU payload update + iu = self._iu_store[event.data.uid] + iu._apply_update(event.data) + self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category) + elif type_ is IULinkUpdate: + # IU link update + iu = self._iu_store[event.data.uid] + iu._apply_link_update(event.data) + self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.LINKSUPDATED, category=iu.category) + else: + logger.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_)) def add_category_interests(self, category_interests): for interest in category_interests: @@ -1178,6 +1202,9 @@ def initialize_ipaaca_rsb():#{{{ rsb.converter.registerGlobalConverter( rsb.converter.ProtocolBufferConverter( messageClass=ipaaca_pb2.IUCommission)) + rsb.converter.registerGlobalConverter( + rsb.converter.ProtocolBufferConverter( + messageClass=ipaaca_pb2.IURetraction)) rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources() #t = rsb.ParticipantConfig.Transport('spread', {'enabled':'true'}) #rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromFile('rsb.cfg') diff --git a/ipaacatools/scripts/ipaaca-iu-sniffer.py b/ipaacatools/scripts/ipaaca-iu-sniffer.py index c810ec868d6b00932b8c73b9ab66c8ec6db1e9f0..f5798abd2ddee62397f78b5bf4862b8a09660934 100755 --- a/ipaacatools/scripts/ipaaca-iu-sniffer.py +++ b/ipaacatools/scripts/ipaaca-iu-sniffer.py @@ -5,12 +5,15 @@ import logging import ipaaca def my_update_handler(iu, event_type, local): - print(event_type+': '+str(iu)) + t=time.localtime() + print str(t.tm_hour)+':'+str(t.tm_min)+':'+str(t.tm_sec), + print(event_type+': '+unicode(iu)) ib = ipaaca.InputBuffer('SnifferIn', ['']) ib.register_handler(my_update_handler) +print("Listening for IU events of any category...") +print('') while True: - print(" .") time.sleep(1)