Skip to content
Snippets Groups Projects
Commit 2ae5cc1b authored by Hendrik Buschmeier's avatar Hendrik Buschmeier
Browse files

Changes to complex payload types get reverted if IU update fails.

parent d1aa2b6f
No related branches found
No related tags found
No related merge requests found
...@@ -35,8 +35,6 @@ from __future__ import division, print_function ...@@ -35,8 +35,6 @@ from __future__ import division, print_function
import rsb import rsb
import rsb.converter import rsb.converter
from ipaaca.misc import logger from ipaaca.misc import logger
import ipaaca_pb2 import ipaaca_pb2
...@@ -86,22 +84,11 @@ def initialize_ipaaca_rsb(): ...@@ -86,22 +84,11 @@ def initialize_ipaaca_rsb():
messageClass=ipaaca_pb2.IURetraction)) messageClass=ipaaca_pb2.IURetraction))
rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources() rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()
#t = rsb.ParticipantConfig.Transport('spread', {'enabled':'true'}) # t = rsb.ParticipantConfig.Transport('spread', {'enabled':'true'})
#rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromFile('rsb.cfg') # rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromFile('rsb.cfg')
## --- Module initialisation ------------------------------------------------- ## --- Module initialisation -------------------------------------------------
# register our own RSB Converters # register our own RSB Converters
initialize_ipaaca_rsb() initialize_ipaaca_rsb()
# IDEAS
# We should think about relaying the update event (or at least the
# affected keys in the payload / links) to the event handlers!
# THOUGHTS
# Output buffers could generate UIDs for IUs on request, without
# publishing them at that time. Then UID could then be used
# for internal links etc. The IU may be published later through
# the same buffer that allocated the UID.
...@@ -176,7 +176,6 @@ class InputBuffer(Buffer): ...@@ -176,7 +176,6 @@ class InputBuffer(Buffer):
"""An InputBuffer that holds remote IUs.""" """An InputBuffer that holds remote IUs."""
def __init__(self, owning_component_name, category_interests=None, channel="default", participant_config=None, resend_active = False ): def __init__(self, owning_component_name, category_interests=None, channel="default", participant_config=None, resend_active = False ):
'''Create an InputBuffer. '''Create an InputBuffer.
...@@ -195,7 +194,7 @@ class InputBuffer(Buffer): ...@@ -195,7 +194,7 @@ class InputBuffer(Buffer):
if category_interests is not None: if category_interests is not None:
for cat in category_interests: for cat in category_interests:
self._add_category_listener(cat) self._add_category_listener(cat)
# add own uuid as identifier for hidden channel. (dlw) # add own uuid as identifier for hidden category.
self._add_category_listener(str(self._uuid)) self._add_category_listener(str(self._uuid))
def _get_remote_server(self, iu): def _get_remote_server(self, iu):
...@@ -235,13 +234,17 @@ class InputBuffer(Buffer): ...@@ -235,13 +234,17 @@ class InputBuffer(Buffer):
type_ = type(event.data) type_ = type(event.data)
if type_ is ipaaca.iu.RemotePushIU: if type_ is ipaaca.iu.RemotePushIU:
# a new IU # a new IU
if event.data.uid in self._iu_store: if event.data.uid not in self._iu_store:
# already in our store self._iu_store[event.data.uid] = event.data
pass
else:
self._iu_store[ event.data.uid ] = event.data
event.data.buffer = self event.data.buffer = self
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.ADDED, category=event.data.category) self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.ADDED, category=event.data.category)
else:
# IU already in our store, overwrite local IU, but do not call
# event handler. This functionality is necessary to undo
# destructive changes after a failing remote updates (undo is
# done via the resend request mechanism).
self._iu_store[event.data.uid] = event.data
event.data.buffer = self
elif type_ is ipaaca.iu.RemoteMessage: elif type_ is ipaaca.iu.RemoteMessage:
# a new Message, an ephemeral IU that is removed after calling handlers # a new Message, an ephemeral IU that is removed after calling handlers
self._iu_store[ event.data.uid ] = event.data self._iu_store[ event.data.uid ] = event.data
...@@ -249,19 +252,12 @@ class InputBuffer(Buffer): ...@@ -249,19 +252,12 @@ class InputBuffer(Buffer):
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.MESSAGE, category=event.data.category) self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.MESSAGE, category=event.data.category)
del self._iu_store[ event.data.uid ] del self._iu_store[ event.data.uid ]
else: else:
if event.data.uid not in self._iu_store: # TODO switch default off if event.data.uid not in self._iu_store:
if self._resend_active == True: if self._resend_active:
logger.warning("Resend message for IU which we did not fully receive before.") # send resend request to remote server
# send resend request to remote server (dlw). self._request_remote_resend(event.data)
remote_server = self._get_remote_server(event.data)
resend_request = ipaaca_pb2.IUResendRequest()
resend_request.uid = event.data.uid # target iu
resend_request.hidden_scope_name = str(self._uuid) # hidden channel name
rRevision = remote_server.resendRequest(resend_request)
if rRevision == 0:
raise ipaaca.exception.IUResendFailedError(self)
else: else:
logger.warning("Update message for IU which we did not fully receive before.") logger.warning("Received an update for an IU which we did not receive before.")
return return
# an update to an existing IU # an update to an existing IU
if type_ is ipaaca_pb2.IURetraction: if type_ is ipaaca_pb2.IURetraction:
...@@ -305,10 +301,19 @@ class InputBuffer(Buffer): ...@@ -305,10 +301,19 @@ class InputBuffer(Buffer):
for interest in category_interests: for interest in category_interests:
self._add_category_listener(interest) self._add_category_listener(interest)
def is_resend_active(): def _request_remote_resend(self, iu):
remote_server = self._get_remote_server(iu)
resend_request = ipaaca_pb2.IUResendRequest()
resend_request.uid = iu.uid # target iu
resend_request.hidden_scope_name = str(self._uuid) # hidden category name
remote_revision = remote_server.requestResend(resend_request)
if remote_revision == 0:
raise ipaaca.exception.IUResendRequestFailedError()
def is_resend_active(self):
return self._resend_active return self._resend_active
def set_resend_active(active): def set_resend_active(self, active=True):
self._resend_active = active self._resend_active = active
...@@ -330,8 +335,7 @@ class OutputBuffer(Buffer): ...@@ -330,8 +335,7 @@ class OutputBuffer(Buffer):
self._server.addMethod('updateLinks', self._remote_update_links, ipaaca.converter.IULinkUpdate, int) self._server.addMethod('updateLinks', self._remote_update_links, ipaaca.converter.IULinkUpdate, int)
self._server.addMethod('updatePayload', self._remote_update_payload, ipaaca.converter.IUPayloadUpdate, int) self._server.addMethod('updatePayload', self._remote_update_payload, ipaaca.converter.IUPayloadUpdate, int)
self._server.addMethod('commit', self._remote_commit, ipaaca_pb2.IUCommission, int) self._server.addMethod('commit', self._remote_commit, ipaaca_pb2.IUCommission, int)
# add method to trigger a resend request. (dlw) self._server.addMethod('requestResend', self._remote_request_resend, ipaaca_pb2.IUResendRequest, int)
self._server.addMethod('resendRequest', self._remote_resend_request, ipaaca_pb2.IUResendRequest, int)
self._informer_store = {} self._informer_store = {}
self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-' self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-'
self.__iu_id_counter_lock = threading.Lock() self.__iu_id_counter_lock = threading.Lock()
...@@ -383,14 +387,14 @@ class OutputBuffer(Buffer): ...@@ -383,14 +387,14 @@ class OutputBuffer(Buffer):
self.call_iu_event_handlers(update.uid, local=True, event_type=ipaaca.iu.IUEventType.UPDATED, category=iu.category) self.call_iu_event_handlers(update.uid, local=True, event_type=ipaaca.iu.IUEventType.UPDATED, category=iu.category)
return iu.revision return iu.revision
def _remote_resend_request(self, iu_resend_request_pack): def _remote_request_resend(self, iu_resend_request_pack):
''' Resend an requested iu over the specific hidden channel. (dlw) ''' ''' Resend a requested IU over the specific hidden category.'''
if iu_resend_request_pack.uid not in self._iu_store: if iu_resend_request_pack.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_resend_request_pack.uid)) logger.warning("Remote side requested resending of non-existent IU "+str(iu_resend_request_pack.uid))
return 0 return 0
iu = self._iu_store[iu_resend_request_pack.uid] iu = self._iu_store[iu_resend_request_pack.uid]
with iu.revision_lock: with iu.revision_lock:
if (iu_resend_request_pack.hidden_scope_name is not None) and (iu_resend_request_pack.hidden_scope_name is not ""): if iu_resend_request_pack.hidden_scope_name is not None and iu_resend_request_pack.hidden_scope_name is not '':
informer = self._get_informer(iu_resend_request_pack.hidden_scope_name) informer = self._get_informer(iu_resend_request_pack.hidden_scope_name)
informer.publishData(iu) informer.publishData(iu)
return iu.revision return iu.revision
...@@ -531,7 +535,7 @@ class OutputBuffer(Buffer): ...@@ -531,7 +535,7 @@ class OutputBuffer(Buffer):
if keys_to_remove is None: if keys_to_remove is None:
keys_to_remove = [] keys_to_remove = []
payload_update = ipaaca.converter.IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision) payload_update = ipaaca.converter.IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision)
payload_update.new_items = new_items payload_update.new_items = new_items
if is_delta: if is_delta:
payload_update.keys_to_remove = keys_to_remove payload_update.keys_to_remove = keys_to_remove
payload_update.writer_name = writer_name payload_update.writer_name = writer_name
......
...@@ -34,18 +34,18 @@ from __future__ import division, print_function ...@@ -34,18 +34,18 @@ from __future__ import division, print_function
import collections import collections
try:
import simplejson as json
except ImportError:
import json
logger.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.')
import rsb.converter import rsb.converter
import ipaaca_pb2 import ipaaca_pb2
import ipaaca.iu import ipaaca.iu
from ipaaca.misc import logger from ipaaca.misc import logger
try:
import simplejson as json
except ImportError:
import json
logger.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.')
__all__ = [ __all__ = [
'IntConverter', 'IntConverter',
...@@ -82,7 +82,13 @@ def pack_payload_entry(entry, key, value): ...@@ -82,7 +82,13 @@ def pack_payload_entry(entry, key, value):
def unpack_payload_entry(entry): def unpack_payload_entry(entry):
# We assume that the only transfer types are 'str' or 'json'. Both are transparently handled by json.loads # We assume that the only transfer types are 'str' or 'json'. Both are transparently handled by json.loads
return json.loads(entry.value) if entry.type == 'json':
return json.loads(entry.value)
elif entry.type == 'str':
return entry.value
else:
logger.warn('Received payload entry with unsupported type "' + entry.type + '".')
return entry.value
class IUConverter(rsb.converter.Converter): class IUConverter(rsb.converter.Converter):
......
...@@ -40,7 +40,7 @@ __all__ = [ ...@@ -40,7 +40,7 @@ __all__ = [
'IUPayloadLockTimeoutError', 'IUPayloadLockTimeoutError',
'IUPublishedError', 'IUPublishedError',
'IUReadOnlyError', 'IUReadOnlyError',
'IUResendFailedError', 'IUResendRequestFailedError',
'IUUpdateFailedError', 'IUUpdateFailedError',
] ]
...@@ -81,7 +81,7 @@ class IUReadOnlyError(Exception): ...@@ -81,7 +81,7 @@ class IUReadOnlyError(Exception):
super(IUReadOnlyError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it is read-only.') super(IUReadOnlyError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it is read-only.')
class IUResendFailedError(Exception): class IUResendRequestFailedError(Exception):
"""Error indicating that a remote IU resend failed.""" """Error indicating that a remote IU resend failed."""
def __init__(self, iu): def __init__(self, iu):
super(IUResendFailedError, self).__init__('Remote resend failed for IU ' + str(iu.uid) + '.') super(IUResendFailedError, self).__init__('Remote resend failed for IU ' + str(iu.uid) + '.')
......
...@@ -71,7 +71,7 @@ IUEventType = ipaaca.misc.enum( ...@@ -71,7 +71,7 @@ IUEventType = ipaaca.misc.enum(
) )
class IUInterface(object): #{{{ class IUInterface(object):
"""Base class of all specialised IU classes.""" """Base class of all specialised IU classes."""
...@@ -211,9 +211,9 @@ class IUInterface(object): #{{{ ...@@ -211,9 +211,9 @@ class IUInterface(object): #{{{
fget=_get_owner_name, fget=_get_owner_name,
fset=_set_owner_name, fset=_set_owner_name,
doc="The IU's owner's name.") doc="The IU's owner's name.")
#}}}
class IU(IUInterface):#{{{
class IU(IUInterface):
"""A local IU.""" """A local IU."""
...@@ -320,9 +320,8 @@ class IU(IUInterface):#{{{ ...@@ -320,9 +320,8 @@ class IU(IUInterface):#{{{
fset=_set_uid, fset=_set_uid,
doc='Unique ID of the IU.') doc='Unique ID of the IU.')
#}}}
class Message(IU):#{{{ class Message(IU):
"""Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls.""" """Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls."""
def __init__(self, category='undef', access_mode=IUAccessMode.MESSAGE, read_only=True, _payload_type='MAP'): def __init__(self, category='undef', access_mode=IUAccessMode.MESSAGE, read_only=True, _payload_type='MAP'):
super(Message, self).__init__(category=str(category), access_mode=access_mode, read_only=read_only, _payload_type=_payload_type) super(Message, self).__init__(category=str(category), access_mode=access_mode, read_only=read_only, _payload_type=_payload_type)
...@@ -389,9 +388,9 @@ class Message(IU):#{{{ ...@@ -389,9 +388,9 @@ class Message(IU):#{{{
fget=IUInterface._get_uid, fget=IUInterface._get_uid,
fset=_set_uid, fset=_set_uid,
doc='Unique ID of the IU.') doc='Unique ID of the IU.')
#}}}
class RemoteMessage(IUInterface):#{{{
class RemoteMessage(IUInterface):
"""A remote IU with access mode 'MESSAGE'.""" """A remote IU with access mode 'MESSAGE'."""
...@@ -457,9 +456,9 @@ class RemoteMessage(IUInterface):#{{{ ...@@ -457,9 +456,9 @@ class RemoteMessage(IUInterface):#{{{
"""Apply retraction to the IU""" """Apply retraction to the IU"""
logger.warning('Warning: should never be called: RemoteMessage._apply_retraction') logger.warning('Warning: should never be called: RemoteMessage._apply_retraction')
self._retracted = True self._retracted = True
#}}}
class RemotePushIU(IUInterface):#{{{
class RemotePushIU(IUInterface):
"""A remote IU with access mode 'PUSH'.""" """A remote IU with access mode 'PUSH'."""
......
...@@ -35,6 +35,8 @@ from __future__ import division, print_function ...@@ -35,6 +35,8 @@ from __future__ import division, print_function
import threading import threading
import time import time
import ipaaca.exception
__all__ = [ __all__ = [
'Payload', 'Payload',
...@@ -158,7 +160,7 @@ class Payload(dict): ...@@ -158,7 +160,7 @@ class Payload(dict):
else: else:
self._batch_update_cond.wait(timeout - current_time + start_time) self._batch_update_cond.wait(timeout - current_time + start_time)
current_time = time.time() current_time = time.time()
raise IUPayloadLockTimeoutError(self.iu) raise ipaaca.exception.IUPayloadLockTimeoutError(self.iu)
class PayloadItemProxy(object): class PayloadItemProxy(object):
...@@ -169,7 +171,15 @@ class PayloadItemProxy(object): ...@@ -169,7 +171,15 @@ class PayloadItemProxy(object):
self.identifier_in_payload = identifier_in_payload self.identifier_in_payload = identifier_in_payload
def _notify_payload(self): def _notify_payload(self):
self.payload[self.identifier_in_payload] = self.content try:
self.payload[self.identifier_in_payload] = self.content
except ipaaca.exception.IUUpdateFailedError as e:
# IU update failed. Use the ResendRequest mechanism
# to replace the altered RemotePushIU with the unchanged
# payload from its OutputBuffer.''
iu = self.payload.iu
iu.buffer._request_remote_resend(iu)
raise e # re-raise IUUpdateFailedError from aboves
def _create_proxy(self, obj, identifier_in_payload): def _create_proxy(self, obj, identifier_in_payload):
if isinstance(obj, dict): if isinstance(obj, dict):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment