Skip to content
Snippets Groups Projects
Commit 52d54644 authored by Ramin Yaghoubzadeh's avatar Ramin Yaghoubzadeh
Browse files

Implemented IU retraction for python. For now retraction entails subsequent deletion.

parent 5908d702
No related branches found
No related tags found
No related merge requests found
...@@ -185,6 +185,7 @@ class IUInterface(object): #{{{ ...@@ -185,6 +185,7 @@ class IUInterface(object): #{{{
self._payload_type = None self._payload_type = None
self._owner_name = None self._owner_name = None
self._committed = False self._committed = False
self._retracted = False
self._access_mode = access_mode self._access_mode = access_mode
self._read_only = read_only self._read_only = read_only
self._buffer = None self._buffer = None
...@@ -264,6 +265,12 @@ class IUInterface(object): #{{{ ...@@ -264,6 +265,12 @@ class IUInterface(object): #{{{
fget=_get_committed, fget=_get_committed,
doc='Flag indicating whether this IU has been committed to.') doc='Flag indicating whether this IU has been committed to.')
def _get_retracted(self):
return self._retracted
retracted = property(
fget=_get_retracted,
doc='Flag indicating whether this IU has been retracted.')
def _get_uid(self): def _get_uid(self):
return self._uid return self._uid
uid = property(fget=_get_uid, doc='Unique ID of the IU.') uid = property(fget=_get_uid, doc='Unique ID of the IU.')
...@@ -420,6 +427,7 @@ class RemotePushIU(IUInterface):#{{{ ...@@ -420,6 +427,7 @@ class RemotePushIU(IUInterface):#{{{
self.owner_name = owner_name self.owner_name = owner_name
self._payload_type = payload_type self._payload_type = payload_type
self._committed = committed self._committed = committed
self._retracted = False
# NOTE Since the payload is an already-existant Payload which we didn't modify ourselves, # NOTE Since the payload is an already-existant Payload which we didn't modify ourselves,
# don't try to invoke any modification checks or network updates ourselves either. # don't try to invoke any modification checks or network updates ourselves either.
# We are just receiving it here and applying the new data. # We are just receiving it here and applying the new data.
...@@ -534,6 +542,10 @@ class RemotePushIU(IUInterface):#{{{ ...@@ -534,6 +542,10 @@ class RemotePushIU(IUInterface):#{{{
def _apply_commission(self): def _apply_commission(self):
"""Apply commission to the IU""" """Apply commission to the IU"""
self._committed = True self._committed = True
def _apply_retraction(self):
"""Apply retraction to the IU"""
self._retracted = True
#}}} #}}}
...@@ -905,31 +917,43 @@ class InputBuffer(Buffer): ...@@ -905,31 +917,43 @@ class InputBuffer(Buffer):
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.ADDED, category=event.data.category) self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.ADDED, category=event.data.category)
else: else:
# an update to an existing IU # an update to an existing IU
if event.data.writer_name == self.unique_name:
# Discard updates that originate from this buffer
return
if event.data.uid not in self._iu_store: if event.data.uid not in self._iu_store:
# TODO: we should request the IU's owner to send us the IU # TODO: we should request the IU's owner to send us the IU
logger.warning("Update message for IU which we did not fully receive before.") logger.warning("Update message for IU which we did not fully receive before.")
return return
if type_ is ipaaca_pb2.IUCommission: if type_ is ipaaca_pb2.IURetraction:
# IU commit # IU retraction (cannot be triggered remotely)
iu = self._iu_store[event.data.uid] iu = self._iu_store[event.data.uid]
iu._apply_commission()
iu._revision = event.data.revision iu._revision = event.data.revision
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category) iu._apply_retraction() # for now - just sets the _rectracted flag.
elif type_ is IUPayloadUpdate: self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.RETRACTED, category=iu.category)
# IU payload update # SPECIAL CASE: allow the handlers (which will need to find the IU
iu = self._iu_store[event.data.uid] # in the buffer) to operate on the IU - then delete it afterwards!
iu._apply_update(event.data) # FIXME: for now: retracted == deleted! Think about this later
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category) del(self._iu_store[iu.uid])
elif type_ is IULinkUpdate:
# IU link update
iu = self._iu_store[event.data.uid]
iu._apply_link_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.LINKSUPDATED, category=iu.category)
else: else:
logger.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_)) if event.data.writer_name == self.unique_name:
# Notify only for remotely triggered events;
# Discard updates that originate from this buffer
return
if type_ is ipaaca_pb2.IUCommission:
# IU commit
iu = self._iu_store[event.data.uid]
iu._apply_commission()
iu._revision = event.data.revision
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category)
elif type_ is IUPayloadUpdate:
# IU payload update
iu = self._iu_store[event.data.uid]
iu._apply_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category)
elif type_ is IULinkUpdate:
# IU link update
iu = self._iu_store[event.data.uid]
iu._apply_link_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.LINKSUPDATED, category=iu.category)
else:
logger.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_))
def add_category_interests(self, category_interests): def add_category_interests(self, category_interests):
for interest in category_interests: for interest in category_interests:
...@@ -1178,6 +1202,9 @@ def initialize_ipaaca_rsb():#{{{ ...@@ -1178,6 +1202,9 @@ def initialize_ipaaca_rsb():#{{{
rsb.converter.registerGlobalConverter( rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter( rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca_pb2.IUCommission)) messageClass=ipaaca_pb2.IUCommission))
rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca_pb2.IURetraction))
rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources() rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()
#t = rsb.ParticipantConfig.Transport('spread', {'enabled':'true'}) #t = rsb.ParticipantConfig.Transport('spread', {'enabled':'true'})
#rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromFile('rsb.cfg') #rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromFile('rsb.cfg')
......
...@@ -5,12 +5,15 @@ import logging ...@@ -5,12 +5,15 @@ import logging
import ipaaca import ipaaca
def my_update_handler(iu, event_type, local): def my_update_handler(iu, event_type, local):
print(event_type+': '+str(iu)) t=time.localtime()
print str(t.tm_hour)+':'+str(t.tm_min)+':'+str(t.tm_sec),
print(event_type+': '+unicode(iu))
ib = ipaaca.InputBuffer('SnifferIn', ['']) ib = ipaaca.InputBuffer('SnifferIn', [''])
ib.register_handler(my_update_handler) ib.register_handler(my_update_handler)
print("Listening for IU events of any category...")
print('')
while True: while True:
print(" .")
time.sleep(1) time.sleep(1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment