diff --git a/ipaacalib/python/src/ipaaca/__init__.py b/ipaacalib/python/src/ipaaca/__init__.py index 036849f8767292980b6658e0411de44d0eff64d5..89d4b1d9a4cfe11128755ab1743f413e953dc5cf 100755 --- a/ipaacalib/python/src/ipaaca/__init__.py +++ b/ipaacalib/python/src/ipaaca/__init__.py @@ -35,8 +35,6 @@ from __future__ import division, print_function import rsb import rsb.converter - - from ipaaca.misc import logger import ipaaca_pb2 @@ -86,22 +84,11 @@ def initialize_ipaaca_rsb(): messageClass=ipaaca_pb2.IURetraction)) rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources() - #t = rsb.ParticipantConfig.Transport('spread', {'enabled':'true'}) - #rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromFile('rsb.cfg') + # t = rsb.ParticipantConfig.Transport('spread', {'enabled':'true'}) + # rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromFile('rsb.cfg') ## --- Module initialisation ------------------------------------------------- # register our own RSB Converters initialize_ipaaca_rsb() - - -# IDEAS -# We should think about relaying the update event (or at least the -# affected keys in the payload / links) to the event handlers! - -# THOUGHTS -# Output buffers could generate UIDs for IUs on request, without -# publishing them at that time. Then UID could then be used -# for internal links etc. The IU may be published later through -# the same buffer that allocated the UID. diff --git a/ipaacalib/python/src/ipaaca/buffer.py b/ipaacalib/python/src/ipaaca/buffer.py index 468d1b4f885027e93c09038702a2897570aa691e..c899f4a9c3351f6172b5987eeb858516206c0daf 100644 --- a/ipaacalib/python/src/ipaaca/buffer.py +++ b/ipaacalib/python/src/ipaaca/buffer.py @@ -176,7 +176,6 @@ class InputBuffer(Buffer): """An InputBuffer that holds remote IUs.""" - def __init__(self, owning_component_name, category_interests=None, channel="default", participant_config=None, resend_active = False ): '''Create an InputBuffer. @@ -195,7 +194,7 @@ class InputBuffer(Buffer): if category_interests is not None: for cat in category_interests: self._add_category_listener(cat) - # add own uuid as identifier for hidden channel. (dlw) + # add own uuid as identifier for hidden category. self._add_category_listener(str(self._uuid)) def _get_remote_server(self, iu): @@ -235,13 +234,17 @@ class InputBuffer(Buffer): type_ = type(event.data) if type_ is ipaaca.iu.RemotePushIU: # a new IU - if event.data.uid in self._iu_store: - # already in our store - pass - else: - self._iu_store[ event.data.uid ] = event.data + if event.data.uid not in self._iu_store: + self._iu_store[event.data.uid] = event.data event.data.buffer = self self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.ADDED, category=event.data.category) + else: + # IU already in our store, overwrite local IU, but do not call + # event handler. This functionality is necessary to undo + # destructive changes after a failing remote updates (undo is + # done via the resend request mechanism). + self._iu_store[event.data.uid] = event.data + event.data.buffer = self elif type_ is ipaaca.iu.RemoteMessage: # a new Message, an ephemeral IU that is removed after calling handlers self._iu_store[ event.data.uid ] = event.data @@ -249,19 +252,12 @@ class InputBuffer(Buffer): self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.MESSAGE, category=event.data.category) del self._iu_store[ event.data.uid ] else: - if event.data.uid not in self._iu_store: # TODO switch default off - if self._resend_active == True: - logger.warning("Resend message for IU which we did not fully receive before.") - # send resend request to remote server (dlw). - remote_server = self._get_remote_server(event.data) - resend_request = ipaaca_pb2.IUResendRequest() - resend_request.uid = event.data.uid # target iu - resend_request.hidden_scope_name = str(self._uuid) # hidden channel name - rRevision = remote_server.resendRequest(resend_request) - if rRevision == 0: - raise ipaaca.exception.IUResendFailedError(self) + if event.data.uid not in self._iu_store: + if self._resend_active: + # send resend request to remote server + self._request_remote_resend(event.data) else: - logger.warning("Update message for IU which we did not fully receive before.") + logger.warning("Received an update for an IU which we did not receive before.") return # an update to an existing IU if type_ is ipaaca_pb2.IURetraction: @@ -305,10 +301,19 @@ class InputBuffer(Buffer): for interest in category_interests: self._add_category_listener(interest) - def is_resend_active(): + def _request_remote_resend(self, iu): + remote_server = self._get_remote_server(iu) + resend_request = ipaaca_pb2.IUResendRequest() + resend_request.uid = iu.uid # target iu + resend_request.hidden_scope_name = str(self._uuid) # hidden category name + remote_revision = remote_server.requestResend(resend_request) + if remote_revision == 0: + raise ipaaca.exception.IUResendRequestFailedError() + + def is_resend_active(self): return self._resend_active - def set_resend_active(active): + def set_resend_active(self, active=True): self._resend_active = active @@ -330,8 +335,7 @@ class OutputBuffer(Buffer): self._server.addMethod('updateLinks', self._remote_update_links, ipaaca.converter.IULinkUpdate, int) self._server.addMethod('updatePayload', self._remote_update_payload, ipaaca.converter.IUPayloadUpdate, int) self._server.addMethod('commit', self._remote_commit, ipaaca_pb2.IUCommission, int) - # add method to trigger a resend request. (dlw) - self._server.addMethod('resendRequest', self._remote_resend_request, ipaaca_pb2.IUResendRequest, int) + self._server.addMethod('requestResend', self._remote_request_resend, ipaaca_pb2.IUResendRequest, int) self._informer_store = {} self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-' self.__iu_id_counter_lock = threading.Lock() @@ -383,14 +387,14 @@ class OutputBuffer(Buffer): self.call_iu_event_handlers(update.uid, local=True, event_type=ipaaca.iu.IUEventType.UPDATED, category=iu.category) return iu.revision - def _remote_resend_request(self, iu_resend_request_pack): - ''' Resend an requested iu over the specific hidden channel. (dlw) ''' + def _remote_request_resend(self, iu_resend_request_pack): + ''' Resend a requested IU over the specific hidden category.''' if iu_resend_request_pack.uid not in self._iu_store: - logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_resend_request_pack.uid)) + logger.warning("Remote side requested resending of non-existent IU "+str(iu_resend_request_pack.uid)) return 0 iu = self._iu_store[iu_resend_request_pack.uid] with iu.revision_lock: - if (iu_resend_request_pack.hidden_scope_name is not None) and (iu_resend_request_pack.hidden_scope_name is not ""): + if iu_resend_request_pack.hidden_scope_name is not None and iu_resend_request_pack.hidden_scope_name is not '': informer = self._get_informer(iu_resend_request_pack.hidden_scope_name) informer.publishData(iu) return iu.revision @@ -531,7 +535,7 @@ class OutputBuffer(Buffer): if keys_to_remove is None: keys_to_remove = [] payload_update = ipaaca.converter.IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision) - payload_update.new_items = new_items + payload_update.new_items = new_items if is_delta: payload_update.keys_to_remove = keys_to_remove payload_update.writer_name = writer_name diff --git a/ipaacalib/python/src/ipaaca/converter.py b/ipaacalib/python/src/ipaaca/converter.py index 3d37b550f81ba25cece580b17a06ddd2567dbc92..8a9137305bd69b8d6643aa5af6217b5b3929d4d2 100644 --- a/ipaacalib/python/src/ipaaca/converter.py +++ b/ipaacalib/python/src/ipaaca/converter.py @@ -34,18 +34,18 @@ from __future__ import division, print_function import collections -try: - import simplejson as json -except ImportError: - import json - logger.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.') - import rsb.converter import ipaaca_pb2 import ipaaca.iu from ipaaca.misc import logger +try: + import simplejson as json +except ImportError: + import json + logger.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.') + __all__ = [ 'IntConverter', @@ -82,7 +82,13 @@ def pack_payload_entry(entry, key, value): def unpack_payload_entry(entry): # We assume that the only transfer types are 'str' or 'json'. Both are transparently handled by json.loads - return json.loads(entry.value) + if entry.type == 'json': + return json.loads(entry.value) + elif entry.type == 'str': + return entry.value + else: + logger.warn('Received payload entry with unsupported type "' + entry.type + '".') + return entry.value class IUConverter(rsb.converter.Converter): diff --git a/ipaacalib/python/src/ipaaca/exception.py b/ipaacalib/python/src/ipaaca/exception.py index 97747c4636dab5039a073acd2bfe7c19f90eaf7f..fd4046a4823422ff92d96a34c9f78d2616bbf3c1 100644 --- a/ipaacalib/python/src/ipaaca/exception.py +++ b/ipaacalib/python/src/ipaaca/exception.py @@ -40,7 +40,7 @@ __all__ = [ 'IUPayloadLockTimeoutError', 'IUPublishedError', 'IUReadOnlyError', - 'IUResendFailedError', + 'IUResendRequestFailedError', 'IUUpdateFailedError', ] @@ -81,7 +81,7 @@ class IUReadOnlyError(Exception): super(IUReadOnlyError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it is read-only.') -class IUResendFailedError(Exception): +class IUResendRequestFailedError(Exception): """Error indicating that a remote IU resend failed.""" def __init__(self, iu): super(IUResendFailedError, self).__init__('Remote resend failed for IU ' + str(iu.uid) + '.') diff --git a/ipaacalib/python/src/ipaaca/iu.py b/ipaacalib/python/src/ipaaca/iu.py index 7f80a1ab9164530465d49ecdd5ee7e573d3bed69..ec51152c5782cb30b5e89a336e8a11b65aea7dbf 100644 --- a/ipaacalib/python/src/ipaaca/iu.py +++ b/ipaacalib/python/src/ipaaca/iu.py @@ -71,7 +71,7 @@ IUEventType = ipaaca.misc.enum( ) -class IUInterface(object): #{{{ +class IUInterface(object): """Base class of all specialised IU classes.""" @@ -211,9 +211,9 @@ class IUInterface(object): #{{{ fget=_get_owner_name, fset=_set_owner_name, doc="The IU's owner's name.") -#}}} -class IU(IUInterface):#{{{ + +class IU(IUInterface): """A local IU.""" @@ -320,9 +320,8 @@ class IU(IUInterface):#{{{ fset=_set_uid, doc='Unique ID of the IU.') -#}}} -class Message(IU):#{{{ +class Message(IU): """Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls.""" def __init__(self, category='undef', access_mode=IUAccessMode.MESSAGE, read_only=True, _payload_type='MAP'): super(Message, self).__init__(category=str(category), access_mode=access_mode, read_only=read_only, _payload_type=_payload_type) @@ -389,9 +388,9 @@ class Message(IU):#{{{ fget=IUInterface._get_uid, fset=_set_uid, doc='Unique ID of the IU.') -#}}} -class RemoteMessage(IUInterface):#{{{ + +class RemoteMessage(IUInterface): """A remote IU with access mode 'MESSAGE'.""" @@ -457,9 +456,9 @@ class RemoteMessage(IUInterface):#{{{ """Apply retraction to the IU""" logger.warning('Warning: should never be called: RemoteMessage._apply_retraction') self._retracted = True -#}}} -class RemotePushIU(IUInterface):#{{{ + +class RemotePushIU(IUInterface): """A remote IU with access mode 'PUSH'.""" diff --git a/ipaacalib/python/src/ipaaca/payload.py b/ipaacalib/python/src/ipaaca/payload.py index 9f0c047e494dd8458ad36a8877ac3659d57062f6..054165afa327ad4a17763d1fa303d900192b2fa6 100644 --- a/ipaacalib/python/src/ipaaca/payload.py +++ b/ipaacalib/python/src/ipaaca/payload.py @@ -35,6 +35,8 @@ from __future__ import division, print_function import threading import time +import ipaaca.exception + __all__ = [ 'Payload', @@ -158,7 +160,7 @@ class Payload(dict): else: self._batch_update_cond.wait(timeout - current_time + start_time) current_time = time.time() - raise IUPayloadLockTimeoutError(self.iu) + raise ipaaca.exception.IUPayloadLockTimeoutError(self.iu) class PayloadItemProxy(object): @@ -169,7 +171,15 @@ class PayloadItemProxy(object): self.identifier_in_payload = identifier_in_payload def _notify_payload(self): - self.payload[self.identifier_in_payload] = self.content + try: + self.payload[self.identifier_in_payload] = self.content + except ipaaca.exception.IUUpdateFailedError as e: + # IU update failed. Use the ResendRequest mechanism + # to replace the altered RemotePushIU with the unchanged + # payload from its OutputBuffer.'' + iu = self.payload.iu + iu.buffer._request_remote_resend(iu) + raise e # re-raise IUUpdateFailedError from aboves def _create_proxy(self, obj, identifier_in_payload): if isinstance(obj, dict):