Skip to content
Snippets Groups Projects
Commit adda62a6 authored by Dennis Leroy Wigand's avatar Dennis Leroy Wigand
Browse files

added resend request feature

parent 98c73a9d
No related branches found
No related tags found
No related merge requests found
......@@ -84,6 +84,11 @@ message IUCommission {
required string writer_name = 3;
}
message IUResendRequest {
required string uid = 1;
required string hidden_name = 2;
}
message IULinkUpdate {
required string uid = 1;
required uint32 revision = 2;
......
......@@ -2,10 +2,10 @@
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# CITEC, Bielefeld University
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
......@@ -22,7 +22,7 @@
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
......@@ -65,7 +65,7 @@ __all__ = [
'IUAccessMode',
'InputBuffer', 'OutputBuffer',
'IU',
'IUPublishedError', 'IUUpdateFailedError', 'IUCommittedError', 'IUReadOnlyError', 'IUNotFoundError',
'IUPublishedError', 'IUUpdateFailedError', 'IUCommittedError', 'IUReadOnlyError', 'IUNotFoundError', 'IUResendFailedError',
'logger'
]
......@@ -75,7 +75,7 @@ __all__ = [
def enum(*sequential, **named):
"""Create an enum type.
Based on suggestion of Alec Thomas on stackoverflow.com:
http://stackoverflow.com/questions/36932/
whats-the-best-way-to-implement-an-enum-in-python/1695250#1695250
......@@ -96,12 +96,12 @@ def unpack_typed_payload_item(protobuf_object):
class IpaacaLoggingHandler(logging.Handler):
def __init__(self, level=logging.DEBUG):
logging.Handler.__init__(self, level)
def emit(self, record):
meta = '[ipaaca] (' + str(record.levelname) + ') '
meta = '[ipaaca] (' + str(record.levelname) + ') '
msg = str(record.msg.format(record.args))
print(meta + msg)
......@@ -141,6 +141,10 @@ class IUUpdateFailedError(Exception):
def __init__(self, iu):
super(IUUpdateFailedError, self).__init__('Remote update failed for IU ' + str(iu.uid) + '.')
class IUResendFailedError(Exception):
"""Error indicating that a remote IU resend failed."""
def __init__(self, iu):
super(IUResendFailedError, self).__init__('Remote resend failed for IU ' + str(iu.uid) + '.')
class IUCommittedError(Exception):
"""Error indicating that an IU is immutable because it has been committed to."""
......@@ -253,13 +257,13 @@ class Payload(dict):
#print("Payload.__delitem__() OUT")
self._batch_update_lock.release()
return r
# Context-manager based batch updates, not yet thread-safe (on remote updates)
# Context-manager based batch updates, not yet thread-safe (on remote updates)
def __enter__(self):
#print('running Payload.__enter__()')
self._wait_batch_update_lock(self._update_timeout)
self._update_on_every_change = False
def __exit__(self, type, value, traceback):
#print('running Payload.__exit__()')
self.iu._modify_payload(is_delta=True, new_items=self._collected_modifications, keys_to_remove=self._collected_removals, writer_name=self._batch_update_writer_name)
......@@ -291,12 +295,12 @@ class Payload(dict):
class IUInterface(object): #{{{
"""Base class of all specialised IU classes."""
def __init__(self, uid, access_mode=IUAccessMode.PUSH, read_only=False):
"""Creates an IU.
Keyword arguments:
uid -- unique ID of this IU
access_mode -- access mode of this IU
......@@ -314,7 +318,7 @@ class IUInterface(object): #{{{
self._buffer = None
# payload is not present here
self._links = collections.defaultdict(set)
def __str__(self):
s = unicode(self.__class__)+"{ "
s += "category="+("<None>" if self._category is None else self._category)+" "
......@@ -331,8 +335,8 @@ class IUInterface(object): #{{{
s += "} "
s += "}"
return s
def _add_and_remove_links(self, add, remove):
'''Just add and remove the new links in our links set, do not send an update here'''
'''Note: Also used for remotely enforced links updates.'''
......@@ -343,7 +347,7 @@ class IUInterface(object): #{{{
'''Note: Also used for remotely enforced links updates.'''
self._links = collections.defaultdict(set)
for type in links.keys(): self._links[type] |= set(links[type])
def add_links(self, type, targets, writer_name=None):
'''Attempt to add links if the conditions are met
and send an update message. Then call the local setter.'''
......@@ -370,45 +374,45 @@ class IUInterface(object): #{{{
return set(self._links[type])
def get_all_links(self):
return copy.deepcopy(self._links)
def _get_revision(self):
return self._revision
revision = property(fget=_get_revision, doc='Revision number of the IU.')
def _get_category(self):
return self._category
category = property(fget=_get_category, doc='Category of the IU.')
def _get_payload_type(self):
return self._payload_type
payload_type = property(fget=_get_payload_type, doc='Type of the IU payload')
def _get_committed(self):
return self._committed
committed = property(
fget=_get_committed,
doc='Flag indicating whether this IU has been committed to.')
def _get_retracted(self):
return self._retracted
retracted = property(
fget=_get_retracted,
doc='Flag indicating whether this IU has been retracted.')
def _get_uid(self):
return self._uid
uid = property(fget=_get_uid, doc='Unique ID of the IU.')
def _get_access_mode(self):
return self._access_mode
access_mode = property(fget=_get_access_mode, doc='Access mode of the IU.')
def _get_read_only(self):
return self._read_only
read_only = property(
fget=_get_read_only,
fget=_get_read_only,
doc='Flag indicating whether this IU is read only.')
def _get_buffer(self):
return self._buffer
def _set_buffer(self, buffer):
......@@ -417,9 +421,9 @@ class IUInterface(object): #{{{
self._buffer = buffer
buffer = property(
fget=_get_buffer,
fset=_set_buffer,
fset=_set_buffer,
doc='Buffer this IU is held in.')
def _get_owner_name(self):
return self._owner_name
def _set_owner_name(self, owner_name):
......@@ -444,7 +448,7 @@ class IU(IUInterface):#{{{
self._payload_type = _payload_type
self.revision_lock = threading.RLock()
self._payload = Payload(iu=self)
def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
if self.committed:
raise IUCommittedError(self)
......@@ -460,7 +464,7 @@ class IU(IUInterface):#{{{
new_links=new_links,
links_to_remove=links_to_remove,
writer_name=self.owner_name if writer_name is None else writer_name)
def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
"""Modify the payload: add or remove items from this payload locally and send update."""
if self.committed:
......@@ -479,10 +483,10 @@ class IU(IUInterface):#{{{
new_items=new_items,
keys_to_remove=keys_to_remove,
writer_name=self.owner_name if writer_name is None else writer_name)
def _increase_revision_number(self):
self._revision += 1
def _internal_commit(self, writer_name=None):
if self.committed:
raise IUCommittedError(self)
......@@ -492,11 +496,11 @@ class IU(IUInterface):#{{{
self._committed = True
if self.buffer is not None:
self.buffer._send_iu_commission(self, writer_name=writer_name)
def commit(self):
"""Commit to this IU."""
return self._internal_commit()
def _get_payload(self):
return self._payload
def _set_payload(self, new_pl, writer_name=None):
......@@ -512,13 +516,13 @@ class IU(IUInterface):#{{{
fget=_get_payload,
fset=_set_payload,
doc='Payload dictionary of this IU.')
def _get_is_published(self):
return self.buffer is not None
is_published = property(
fget=_get_is_published,
fget=_get_is_published,
doc='Flag indicating whether this IU has been published or not.')
def _set_buffer(self, buffer):
if self._buffer is not None:
raise Exception('The IU is already in a buffer, cannot move it.')
......@@ -529,7 +533,7 @@ class IU(IUInterface):#{{{
fget=IUInterface._get_buffer,
fset=_set_buffer,
doc='Buffer this IU is held in.')
def _set_uid(self, uid):
if self._uid is not None:
raise AttributeError('The uid of IU ' + self.uid + ' has already been set, cannot change it.')
......@@ -545,25 +549,25 @@ class Message(IU):#{{{
"""Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls."""
def __init__(self, category='undef', access_mode=IUAccessMode.MESSAGE, read_only=True, _payload_type='MAP'):
super(Message, self).__init__(category=category, access_mode=access_mode, read_only=read_only, _payload_type=_payload_type)
def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
if self.is_published:
logger.info('Info: modifying a Message after sending has no global effects')
def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
if self.is_published:
logger.info('Info: modifying a Message after sending has no global effects')
def _increase_revision_number(self):
self._revision += 1
def _internal_commit(self, writer_name=None):
if self.is_published:
logger.info('Info: committing to a Message after sending has no global effects')
def commit(self):
return self._internal_commit()
def _get_payload(self):
return self._payload
def _set_payload(self, new_pl, writer_name=None):
......@@ -582,13 +586,13 @@ class Message(IU):#{{{
fget=_get_payload,
fset=_set_payload,
doc='Payload dictionary of this IU.')
def _get_is_published(self):
return self.buffer is not None
is_published = property(
fget=_get_is_published,
fget=_get_is_published,
doc='Flag indicating whether this IU has been published or not.')
def _set_buffer(self, buffer):
if self._buffer is not None:
raise Exception('The IU is already in a buffer, cannot move it.')
......@@ -599,7 +603,7 @@ class Message(IU):#{{{
fget=IUInterface._get_buffer,
fset=_set_buffer,
doc='Buffer this IU is held in.')
def _set_uid(self, uid):
if self._uid is not None:
raise AttributeError('The uid of IU ' + self.uid + ' has already been set, cannot change it.')
......@@ -611,9 +615,9 @@ class Message(IU):#{{{
#}}}
class RemoteMessage(IUInterface):#{{{
"""A remote IU with access mode 'MESSAGE'."""
def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links):
super(RemoteMessage, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only)
self._revision = revision
......@@ -627,10 +631,10 @@ class RemoteMessage(IUInterface):#{{{
# We are just receiving it here and applying the new data.
self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True)
self._links = links
def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
logger.info('Info: modifying a RemoteMessage only has local effects')
def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
logger.info('Info: modifying a RemoteMessage only has local effects')
......@@ -655,7 +659,7 @@ class RemoteMessage(IUInterface):#{{{
self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove)
else:
self._replace_links(links=update.new_links)
def _apply_update(self, update):
"""Apply a IUPayloadUpdate to the IU."""
logger.warning('Warning: should never be called: RemoteMessage._apply_update')
......@@ -666,12 +670,12 @@ class RemoteMessage(IUInterface):#{{{
else:
# NOTE Please read the comment in the constructor
self._payload = Payload(iu=self, new_payload=update.new_items, omit_init_update_message=True)
def _apply_commission(self):
"""Apply commission to the IU"""
logger.warning('Warning: should never be called: RemoteMessage._apply_commission')
self._committed = True
def _apply_retraction(self):
"""Apply retraction to the IU"""
logger.warning('Warning: should never be called: RemoteMessage._apply_retraction')
......@@ -679,9 +683,9 @@ class RemoteMessage(IUInterface):#{{{
#}}}
class RemotePushIU(IUInterface):#{{{
"""A remote IU with access mode 'PUSH'."""
def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links):
super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only)
self._revision = revision
......@@ -695,7 +699,7 @@ class RemotePushIU(IUInterface):#{{{
# We are just receiving it here and applying the new data.
self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True)
self._links = links
def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
"""Modify the links: add or remove item from this payload remotely and send update."""
if self.committed:
......@@ -715,7 +719,7 @@ class RemotePushIU(IUInterface):#{{{
raise IUUpdateFailedError(self)
else:
self._revision = new_revision
def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
"""Modify the payload: add or remove item from this payload remotely and send update."""
if self.committed:
......@@ -790,7 +794,7 @@ class RemotePushIU(IUInterface):#{{{
self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove)
else:
self._replace_links(links=update.new_links)
def _apply_update(self, update):
"""Apply a IUPayloadUpdate to the IU."""
self._revision = update.revision
......@@ -800,11 +804,11 @@ class RemotePushIU(IUInterface):#{{{
else:
# NOTE Please read the comment in the constructor
self._payload = Payload(iu=self, new_payload=update.new_items, omit_init_update_message=True)
def _apply_commission(self):
"""Apply commission to the IU"""
self._committed = True
def _apply_retraction(self):
"""Apply retraction to the IU"""
self._retracted = True
......@@ -815,12 +819,12 @@ class IntConverter(rsb.converter.Converter):#{{{
"""Convert Python int objects to Protobuf ints and vice versa."""
def __init__(self, wireSchema="int", dataType=int):
super(IntConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, value):
pbo = ipaaca_pb2.IntMessage()
pbo.value = value
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca_pb2.IntMessage()
pbo.ParseFromString( str(byte_stream) )
......@@ -835,7 +839,7 @@ class IUConverter(rsb.converter.Converter):#{{{
'''
def __init__(self, wireSchema="ipaaca-iu", dataType=IU):
super(IUConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu):
pbo = ipaaca_pb2.IU()
pbo.uid = iu._uid
......@@ -859,7 +863,7 @@ class IUConverter(rsb.converter.Converter):#{{{
linkset.targets.extend(iu._links[type_])
ws = "ipaaca-messageiu" if iu._access_mode == IUAccessMode.MESSAGE else self.wireSchema
return bytearray(pbo.SerializeToString()), ws
def deserialize(self, byte_stream, ws):
type = self.getDataType()
#print('IUConverter.deserialize got a '+str(type)+' over wireSchema '+ws)
......@@ -921,7 +925,7 @@ class MessageConverter(rsb.converter.Converter):#{{{
'''
def __init__(self, wireSchema="ipaaca-messageiu", dataType=Message):
super(IUConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu):
pbo = ipaaca_pb2.IU()
pbo.uid = iu._uid
......@@ -945,7 +949,7 @@ class MessageConverter(rsb.converter.Converter):#{{{
linkset.targets.extend(iu._links[type_])
ws = "ipaaca-messageiu" if iu._access_mode == IUAccessMode.MESSAGE else self.wireSchema
return bytearray(pbo.SerializeToString()), ws
def deserialize(self, byte_stream, ws):
type = self.getDataType()
#print('MessageConverter.deserialize got a '+str(type)+' over wireSchema '+ws)
......@@ -1002,7 +1006,7 @@ class MessageConverter(rsb.converter.Converter):#{{{
class IULinkUpdate(object):#{{{
def __init__(self, uid, revision, is_delta, writer_name="undef", new_links=None, links_to_remove=None):
super(IULinkUpdate, self).__init__()
self.uid = uid
......@@ -1011,7 +1015,7 @@ class IULinkUpdate(object):#{{{
self.is_delta = is_delta
self.new_links = collections.defaultdict(set) if new_links is None else collections.defaultdict(set, new_links)
self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove)
def __str__(self):
s = 'LinkUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
......@@ -1023,7 +1027,7 @@ class IULinkUpdate(object):#{{{
#}}}
class IUPayloadUpdate(object):#{{{
def __init__(self, uid, revision, is_delta, writer_name="undef", new_items=None, keys_to_remove=None):
super(IUPayloadUpdate, self).__init__()
self.uid = uid
......@@ -1032,7 +1036,7 @@ class IUPayloadUpdate(object):#{{{
self.is_delta = is_delta
self.new_items = {} if new_items is None else new_items
self.keys_to_remove = [] if keys_to_remove is None else keys_to_remove
def __str__(self):
s = 'PayloadUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
......@@ -1046,7 +1050,7 @@ class IUPayloadUpdate(object):#{{{
class IULinkUpdateConverter(rsb.converter.Converter):#{{{
def __init__(self, wireSchema="ipaaca-iu-link-update", dataType=IULinkUpdate):
super(IULinkUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu_link_update):
pbo = ipaaca_pb2.IULinkUpdate()
pbo.uid = iu_link_update.uid
......@@ -1062,7 +1066,7 @@ class IULinkUpdateConverter(rsb.converter.Converter):#{{{
linkset.targets.extend(iu_link_update.links_to_remove[type_])
pbo.is_delta = iu_link_update.is_delta
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
type = self.getDataType()
if type == IULinkUpdate:
......@@ -1082,7 +1086,7 @@ class IULinkUpdateConverter(rsb.converter.Converter):#{{{
class IUPayloadUpdateConverter(rsb.converter.Converter):#{{{
def __init__(self, wireSchema="ipaaca-iu-payload-update", dataType=IUPayloadUpdate):
super(IUPayloadUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu_payload_update):
pbo = ipaaca_pb2.IUPayloadUpdate()
pbo.uid = iu_payload_update.uid
......@@ -1094,7 +1098,7 @@ class IUPayloadUpdateConverter(rsb.converter.Converter):#{{{
pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove)
pbo.is_delta = iu_payload_update.is_delta
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
type = self.getDataType()
if type == IUPayloadUpdate:
......@@ -1128,12 +1132,12 @@ class FrozenIUStore(IUStore):
raise AttributeError()
class IUEventHandler(object):
"""Wrapper for IU event handling functions."""
def __init__(self, handler_function, for_event_types=None, for_categories=None):
"""Create an IUEventHandler.
Keyword arguments:
handler_function -- the handler function with the signature
(IU, event_type, local)
......@@ -1150,10 +1154,10 @@ class IUEventHandler(object):
self._for_categories = (
None if for_categories is None else
(for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories]))
def condition_met(self, event_type, category):
"""Check whether this IUEventHandler should be called.
Keyword arguments:
event_type -- type of the IU event
category -- category of the IU which triggered the event
......@@ -1161,10 +1165,10 @@ class IUEventHandler(object):
type_condition_met = (self._for_event_types is None or event_type in self._for_event_types)
cat_condition_met = (self._for_categories is None or category in self._for_categories)
return type_condition_met and cat_condition_met
def call(self, buffer, iu_uid, local, event_type, category):
"""Call this IUEventHandler's function, if it applies.
Keyword arguments:
buffer -- the buffer in which the IU is stored
iu_uid -- the uid of the IU
......@@ -1178,12 +1182,12 @@ class IUEventHandler(object):
class Buffer(object):
"""Base class for InputBuffer and OutputBuffer."""
def __init__(self, owning_component_name, participant_config=None):
'''Create a Buffer.
Keyword arguments:
owning_compontent_name -- name of the entity that owns this Buffer
participant_config -- RSB configuration
......@@ -1196,49 +1200,49 @@ class Buffer(object):
self._unique_name = "undef-"+self._uuid
self._iu_store = IUStore()
self._iu_event_handlers = []
def _get_frozen_iu_store(self):
return FrozenIUStore(original_iu_store = self._iu_store)
iu_store = property(fget=_get_frozen_iu_store, doc='Copy-on-read version of the internal IU store')
def register_handler(self, handler_function, for_event_types=None, for_categories=None):
"""Register a new IU event handler function.
Keyword arguments:
handler_function -- a function with the signature (IU, event_type, local)
for_event_types -- a list of event types or None if handler should
be called for all event types
for_categories -- a list of category names or None if handler should
be called for all categories
"""
handler = IUEventHandler(handler_function=handler_function, for_event_types=for_event_types, for_categories=for_categories)
self._iu_event_handlers.append(handler)
def call_iu_event_handlers(self, uid, local, event_type, category):
"""Call registered IU event handler functions registered for this event_type and category."""
for h in self._iu_event_handlers:
h.call(self, uid, local=local, event_type=event_type, category=category)
def _get_owning_component_name(self):
"""Return the name of this Buffer's owning component"""
return self._owning_component_name
owning_component_name = property(_get_owning_component_name)
def _get_unique_name(self):
"""Return the Buffer's unique name."""
return self._unique_name
unique_name = property(_get_unique_name)
class InputBuffer(Buffer):
"""An InputBuffer that holds remote IUs."""
def __init__(self, owning_component_name, category_interests=None, participant_config=None):
'''Create an InputBuffer.
Keyword arguments:
owning_compontent_name -- name of the entity that owns this InputBuffer
category_interests -- list of IU categories this Buffer is interested in
......@@ -1252,31 +1256,39 @@ class InputBuffer(Buffer):
if category_interests is not None:
for cat in category_interests:
self._add_category_listener(cat)
# add own uuid as identifier for hidden channel. (dlw)
self._add_category_listener(str(self._uuid))
def _get_remote_server(self, iu):
'''Return (or create, store and return) a remote server.'''
if iu.owner_name in self._remote_server_store:
return self._remote_server_store[iu.owner_name]
# TODO remove the str() when unicode is supported (issue #490)
remote_server = rsb.createRemoteServer(rsb.Scope(str(iu.owner_name)))
self._remote_server_store[iu.owner_name] = remote_server
_owner = None
if hasattr(iu,'owner_name'):
_owner = iu.owner_name
elif hasattr(iu,'writer_name'):
_owner = iu.writer_name
if _owner is not None:
if _owner in self._remote_server_store:
return self._remote_server_store[_owner]
# TODO remove the str() when unicode is supported (issue #490)
remote_server = rsb.createRemoteServer(rsb.Scope(str(_owner)))
self._remote_server_store[_owner] = remote_server
return remote_server
def _add_category_listener(self, iu_category):
'''Return (or create, store and return) a category listener.'''
if iu_category not in self._listener_store:
if iu_category not in self._listener_store:
cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
cat_listener.addHandler(self._handle_iu_events)
self._listener_store[iu_category] = cat_listener
self._category_interests.append(iu_category)
logger.info("Added listener in scope "+"/ipaaca/category/"+iu_category)
logger.warning("Added listener in scope "+"/ipaaca/category/"+iu_category)#info
def _handle_iu_events(self, event):
'''Dispatch incoming IU events.
Adds incoming IU's to the store, applies payload and commit updates to
IU, calls IU event handlers.'
Keyword arguments:
event -- a converted RSB event
'''
......@@ -1299,8 +1311,16 @@ class InputBuffer(Buffer):
else:
# an update to an existing IU
if event.data.uid not in self._iu_store:
# TODO: we should request the IU's owner to send us the IU
logger.warning("Update message for IU which we did not fully receive before.")
logger.warning("Resend message for IU which we did not fully receive before.")
# send resend request to remote server (dlw).
remote_server = self._get_remote_server(event.data)
resend_request = ipaaca_pb2.IUResendRequest()
resend_request.uid = event.data.uid # target iu
resend_request.hidden_name = str(self._uuid) # hidden channel name
rRevision = remote_server.resendRequest(resend_request)
if rRevision == 0:
raise IUResendFailedError(self)
return
if type_ is ipaaca_pb2.IURetraction:
# IU retraction (cannot be triggered remotely)
......@@ -1319,7 +1339,7 @@ class InputBuffer(Buffer):
return
#else:
# print('Got update written by buffer '+str(event.data.writer_name))
if type_ is ipaaca_pb2.IUCommission:
# IU commit
iu = self._iu_store[event.data.uid]
......@@ -1345,12 +1365,12 @@ class InputBuffer(Buffer):
class OutputBuffer(Buffer):
"""An OutputBuffer that holds local IUs."""
def __init__(self, owning_component_name, participant_config=None):
'''Create an Output Buffer.
Keyword arguments:
owning_component_name -- name of the entity that own this buffer
participant_config -- RSB configuration
......@@ -1361,11 +1381,13 @@ class OutputBuffer(Buffer):
self._server.addMethod('updateLinks', self._remote_update_links, IULinkUpdate, int)
self._server.addMethod('updatePayload', self._remote_update_payload, IUPayloadUpdate, int)
self._server.addMethod('commit', self._remote_commit, ipaaca_pb2.IUCommission, int)
# add method to trigger a resend request. (dlw)
self._server.addMethod('resendRequest', self._remote_resend_request, ipaaca_pb2.IUResendRequest, int)
self._informer_store = {}
self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-'
self.__iu_id_counter_lock = threading.Lock()
#self.__iu_id_counter = 0 # hbuschme: IUs now have their Ids assigned on creation
def _create_own_name_listener(self, iu_category):
# FIXME replace this
'''Create an own name listener.'''
......@@ -1377,7 +1399,7 @@ class OutputBuffer(Buffer):
#logger.info("Added category listener for "+iu_category)
#return cat_listener
pass
# hbuschme: IUs now have their Ids assigned on creation
#def _generate_iu_uid(self):
# '''Generate a unique IU id of the form ????'''
......@@ -1403,7 +1425,7 @@ class OutputBuffer(Buffer):
iu.set_links(links=update.new_links, writer_name=update.writer_name)
self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.LINKSUPDATED, category=iu.category)
return iu.revision
def _remote_update_payload(self, update):
'''Apply a remotely requested update to one of the stored IU's payload.'''
if update.uid not in self._iu_store:
......@@ -1431,7 +1453,21 @@ class OutputBuffer(Buffer):
# _set_payload etc. have also incremented the revision number
self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category)
return iu.revision
def _remote_resend_request(self, iu_resend_request_pack):
''' Resend an requested iu. (dlw) '''
if iu_resend_request_pack.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_resend_request_pack.uid))
return 0
iu = self._iu_store[iu_resend_request_pack.uid]
with iu.revision_lock:
if (iu_resend_request_pack.hidden_name is not None) or (iu_resend_request_pack.hidden_name is not ""):
informer = self._get_informer(iu_resend_request_pack.hidden_name)
informer.publishData(iu)
return iu.revision
else:
return 0
def _remote_commit(self, iu_commission):
'''Apply a remotely requested commit to one of the stored IUs.'''
if iu_commission.uid not in self._iu_store:
......@@ -1449,7 +1485,7 @@ class OutputBuffer(Buffer):
iu._internal_commit(writer_name=iu_commission.writer_name)
self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=IUEventType.COMMITTED, category=iu.category)
return iu.revision
def _get_informer(self, iu_category):
'''Return (or create, store and return) an informer object for IUs of the specified category.'''
if iu_category in self._informer_store:
......@@ -1462,7 +1498,7 @@ class OutputBuffer(Buffer):
self._informer_store[iu_category] = informer_iu #new_tuple
logger.info("Returning NEW informer on scope "+"/ipaaca/category/"+str(iu_category))
return informer_iu #return new_tuple
def add(self, iu):
'''Add an IU to the IU store, assign an ID and publish it.'''
# hbuschme: IUs now have their Ids assigned on creation
......@@ -1478,7 +1514,7 @@ class OutputBuffer(Buffer):
self._iu_store[iu.uid] = iu
iu.buffer = self
self._publish_iu(iu)
def remove(self, iu=None, iu_uid=None):
'''Remove the iu or an IU corresponding to iu_uid from the OutputBuffer, retracting it from the system.'''
if iu is None:
......@@ -1492,12 +1528,12 @@ class OutputBuffer(Buffer):
self._retract_iu(iu)
del self._iu_store[iu.uid]
return iu
def _publish_iu(self, iu):
'''Publish an IU.'''
informer = self._get_informer(iu._category)
informer.publishData(iu)
def _retract_iu(self, iu):
'''Retract (unpublish) an IU.'''
iu_retraction = ipaaca_pb2.IURetraction()
......@@ -1505,10 +1541,10 @@ class OutputBuffer(Buffer):
iu_retraction.revision = iu.revision
informer = self._get_informer(iu._category)
informer.publishData(iu_retraction)
def _send_iu_commission(self, iu, writer_name):
'''Send IU commission.
Keyword arguments:
iu -- the IU that has been committed to
writer_name -- name of the Buffer that initiated this commit, necessary
......@@ -1523,10 +1559,10 @@ class OutputBuffer(Buffer):
iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name
informer = self._get_informer(iu._category)
informer.publishData(iu_commission)
def _send_iu_link_update(self, iu, is_delta, revision, new_links=None, links_to_remove=None, writer_name="undef"):
'''Send an IU link update.
Keyword arguments:
iu -- the IU being updated
is_delta -- whether this is an incremental update or a replacement
......@@ -1550,10 +1586,10 @@ class OutputBuffer(Buffer):
informer = self._get_informer(iu._category)
informer.publishData(link_update)
# FIXME send the notification to the target, if the target is not the writer_name
def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"):
'''Send an IU payload update.
Keyword arguments:
iu -- the IU being updated
is_delta -- whether this is an incremental update or a replacement
......@@ -1600,6 +1636,9 @@ def initialize_ipaaca_rsb():#{{{
rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca_pb2.IUCommission))
rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca_pb2.IUResendRequest)) # dlw
rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca_pb2.IURetraction))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment