From 667acc1d586d4773f895b6d1e10b3b0a808c0e6a Mon Sep 17 00:00:00 2001 From: Ramin Yaghoubzadeh <ryaghoub@techfak.uni-bielefeld.de> Date: Wed, 8 Feb 2012 15:30:48 +0100 Subject: [PATCH] Links implemented and tested superficially (local and remote access) --- python/src/informer.py | 6 +- python/src/ipaaca.py | 167 ++++++++++++++++++++++++++++------------- python/src/listener.py | 10 ++- 3 files changed, 129 insertions(+), 54 deletions(-) diff --git a/python/src/informer.py b/python/src/informer.py index 36955ee..2c260c4 100755 --- a/python/src/informer.py +++ b/python/src/informer.py @@ -8,9 +8,12 @@ def remote_change_dumper(iu, event_type, local): print 'remote side '+event_type+': '+str(iu) -ob = ipaaca.OutputBuffer('CoolComponent') +ob = ipaaca.OutputBuffer('CoolInformerOut') ob.register_handler(remote_change_dumper) +iu_top = ipaaca.IU() +iu_top.payload = {'data': 'raw'} +ob.add(iu_top) iu = ipaaca.IU() iu.payload = {'a':'a1'} @@ -20,6 +23,7 @@ iu.payload = {'a':'a2', 'b':'b1'} #OK del(iu.payload['b']) iu.payload['c'] = 'c1' iu.payload['a'] = 'a3' +iu.add_links('sameold', iu_top.uid) time.sleep(1) iu.commit() diff --git a/python/src/ipaaca.py b/python/src/ipaaca.py index ff6bed0..834fed8 100755 --- a/python/src/ipaaca.py +++ b/python/src/ipaaca.py @@ -8,12 +8,22 @@ import sys import threading import uuid import collections +import copy import rsb import rsb.converter import ipaaca_pb2 +# IDEAS +# We should think about relaying the update event (or at least the +# affected keys in the payload / links) to the event handlers! + +# THOUGHTS +# Output buffers could generate UIDs for IUs on request, without +# publishing them at that time. Then UID could then be used +# for internal links etc. The IU may be published later through +# the same buffer that allocated the UID. __all__ = [ 'IUEventType', @@ -110,27 +120,26 @@ class IUReadOnlyError(Exception): ## --- Generation Architecture ----------------------------------------------- - -class Links(object): - ''' This is essentially a dict STR -> set([STR, ...]) ''' - def __init__(self, iu, writer_name=None, new_links=None): - nl = {} if new_links is None else new_links - self.iu = iu - self.iu._set_links(links=self, is_delta=False, new_links=pl, links_to_remove=[], writer_name=writer_name) - for k, v in pl.items(): - dict.__setitem__(self, k, v) - def add_links(self, type, targets, writer_name=None): - if not hasattr(targets, '__iter__'): targets=[targets] - self.iu._set_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name) - def remove_links(self, type, targets, writer_name=None): - if not hasattr(targets, '__iter__'): targets=[targets] - self.iu._set_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name) - def modify_links(self, add, remove, writer_name=None): - self.iu._set_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name) - def set_links(self, links, writer_name=None): - self.iu._set_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name) - def get_links(self, type): - return set(self.iu._get_links()) +#class Links(object): +# ''' This is essentially a dict STR -> set([STR, ...]) ''' +# def __init__(self, iu, writer_name=None, new_links=None): +# nl = {} if new_links is None else new_links +# self.iu = iu +# self.iu._set_links(links=self, is_delta=False, new_links=pl, links_to_remove=[], writer_name=writer_name) +# for k, v in pl.items(): +# dict.__setitem__(self, k, v) +# def add_links(self, type, targets, writer_name=None): +# if not hasattr(targets, '__iter__'): targets=[targets] +# self.iu._set_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name) +# def remove_links(self, type, targets, writer_name=None): +# if not hasattr(targets, '__iter__'): targets=[targets] +# self.iu._set_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name) +# def modify_links(self, add, remove, writer_name=None): +# self.iu._set_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name) +# def set_links(self, links, writer_name=None): +# self.iu._set_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name) +# #def get_links(self, type): +# # return set(self.iu._get_links()) class Payload(dict): def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False): @@ -181,27 +190,60 @@ class IUInterface(object): #{{{ # payload is not present here self._links = collections.defaultdict(set) + def __str__(self): + s = str(self.__class__)+"{ " + s += "uid="+self._uid+" " + s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") " + s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " " + s += "payload={ " + for k,v in self.payload.items(): + s += k+":'"+v+"', " + s += "} " + s += "links={ " + for t,ids in self.get_all_links().items(): + s += t+":'"+str(ids)+"', " + s += "} " + s += "}" + return s + + def _add_and_remove_links(self, add, remove): - for type in remove.keys(): self._links[type] -= remove[type] - for type in add.keys(): self._links[type] |= add[type] + '''Just add and remove the new links in our links set, do not send an update here''' + '''Note: Also used for remotely enforced links updates.''' + for type in remove.keys(): self._links[type] -= set(remove[type]) + for type in add.keys(): self._links[type] |= set(add[type]) + def _replace_links(self, links): + '''Just wipe and replace our links set, do not send an update here''' + '''Note: Also used for remotely enforced links updates.''' + self._links = {} + for type in links.keys(): self._links[type] |= set(links[type]) def add_links(self, type, targets, writer_name=None): + '''Attempt to add links if the conditions are met + and send an update message. Then call the local setter.''' if not hasattr(targets, '__iter__'): targets=[targets] self._modify_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name) self._add_and_remove_links( add={type:targets}, remove={} ) def remove_links(self, type, targets, writer_name=None): + '''Attempt to remove links if the conditions are met + and send an update message. Then call the local setter.''' if not hasattr(targets, '__iter__'): targets=[targets] self._modify_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name) self._add_and_remove_links( add={}, remove={type:targets} ) def modify_links(self, add, remove, writer_name=None): + '''Attempt to modify links if the conditions are met + and send an update message. Then call the local setter.''' self._modify_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name) self._add_and_remove_links( add=add, remove=remove ) def set_links(self, links, writer_name=None): + '''Attempt to set (replace) links if the conditions are met + and send an update message. Then call the local setter.''' self._modify_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name) - self._links = {} - self._add_and_remove_links( add=new_links, remove={} ) + self._replace_links( links=new_links ) def get_links(self, type): return set(self._links[type]) + def get_all_links(self): + return copy.deepcopy(self._links) def _get_revision(self): return self._revision @@ -319,18 +361,6 @@ class IU(IUInterface):#{{{ """Commit to this IU.""" return self._internal_commit() - def __str__(self): - s = "IU{ " - s += "uid="+self._uid+" " - s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") " - s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " " - s += "payload={ " - for k,v in self.payload.items(): - s += k+":'"+v+"', " - s += "} " - s += "}" - return s - def _get_payload(self): return self._payload def _set_payload(self, new_pl, writer_name=None): @@ -391,6 +421,26 @@ class RemotePushIU(IUInterface):#{{{ # We are just receiving it here and applying the new data. self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True) + def _modify_links(self, links, is_delta=False, new_links={}, links_to_remove={}, writer_name=None): + """Modify the links: add or remove item from this payload remotely and send update.""" + if self.committed: + raise IUCommittedError(self) + if self.read_only: + raise IUReadOnlyError(self) + requested_update = IULinkUpdate( + uid=self.uid, + revision=self.revision, + is_delta=is_delta, + writer_name=self.buffer.unique_name, + new_links=new_links, + links_to_remove=links_to_remove) + remote_server = self.buffer._get_remote_server(self) + new_revision = remote_server.updateLinks(requested_update) + if new_revision == 0: + raise IUUpdateFailedError(self) + else: + self._revision = new_revision + def _modify_payload(self, payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): """Modify the payload: add or remove item from this payload remotely and send update.""" if self.committed: @@ -411,18 +461,6 @@ class RemotePushIU(IUInterface):#{{{ else: self._revision = new_revision - def __str__(self): - s = "RemotePushIU{ " - s += "uid="+self._uid+" " - s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") " - s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " " - s += "payload={ " - for k,v in self.payload.items(): - s += k+":'"+v+"', " - s += "} " - s += "}" - return s - def commit(self): """Commit to this IU.""" if self.read_only: @@ -470,6 +508,14 @@ class RemotePushIU(IUInterface):#{{{ fset=_set_payload, doc='Payload dictionary of the IU.') + def _apply_link_update(self, update): + """Apply a IULinkUpdate to the IU.""" + self._revision = update.revision + if update.is_delta: + self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove) + else: + self._replace_links(links=update.new_links) + def _apply_update(self, update): """Apply a IUPayloadUpdate to the IU.""" self._revision = update.revision @@ -566,7 +612,7 @@ class IULinkUpdate(object):#{{{ self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove) def __str__(self): - s = 'PayloadUpdate(' + 'uid=' + self.uid + ', ' + s = 'LinkUpdate(' + 'uid=' + self.uid + ', ' s += 'revision='+str(self.revision)+', ' s += 'writer_name='+str(self.writer_name)+', ' s += 'is_delta='+str(self.is_delta)+', ' @@ -874,6 +920,7 @@ class OutputBuffer(Buffer): super(OutputBuffer, self).__init__(owning_component_name, participant_config) self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB' self._server = rsb.createServer(rsb.Scope(self._unique_name)) + self._server.addMethod('updateLinks', self._remote_update_links, IULinkUpdate, int) self._server.addMethod('updatePayload', self._remote_update_payload, IUPayloadUpdate, int) self._server.addMethod('commit', self._remote_commit, ipaaca_pb2.IUCommission, int) self._informer_store = {} @@ -899,8 +946,26 @@ class OutputBuffer(Buffer): number = self.__iu_id_counter return self._id_prefix + str(number) + def _remote_update_links(self, update): + '''Apply a remotely requested update to one of the stored IU's links.''' + if update.uid not in self._iu_store: + logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid)) + return 0 + iu = self._iu_store[update.uid] + with iu.revision_lock: + if (update.revision != 0) and (update.revision != iu.revision): + # (0 means "do not pay attention to the revision number" -> "force update") + logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid)) + return 0 + if update.is_delta: + iu.modify_links(add=update.new_links, remove=update.links_to_remove, writer_name=update.writer_name) + else: + iu.set_links(links=update.new_links, writer_name=update.writer_name) + self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.LINKSUPDATED, category=iu.category) + return iu.revision + def _remote_update_payload(self, update): - '''Apply a remotely requested update to one of the stored IUs.''' + '''Apply a remotely requested update to one of the stored IU's payload.''' if update.uid not in self._iu_store: logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid)) return 0 diff --git a/python/src/listener.py b/python/src/listener.py index dde63b4..f15776e 100755 --- a/python/src/listener.py +++ b/python/src/listener.py @@ -11,9 +11,13 @@ def my_update_handler(iu, event_type, local): print(event_type+': '+str(iu)) iu_to_write = iu +ob = ipaaca.OutputBuffer('CoolListenerOut') +my_iu = ipaaca.IU() +my_iu.payload = {'some':'info'} +ob.add(my_iu) -ib = ipaaca.InputBuffer('CoolReceiver', ['undef']) +ib = ipaaca.InputBuffer('CoolListenerIn', ['undef']) ib.register_handler(my_update_handler) counter = 0 @@ -25,7 +29,9 @@ while True: iu = iu_to_write #if counter == 1: # iu.payload['a'] = 'remote' - if counter % 3 == 1: + if counter == 10: + iu.add_links('special', my_iu.uid) + elif counter % 3 == 1: iu.payload['a'] = 'REMOTELY SET '+str(counter) elif counter % 3 == 2: del iu.payload['a'] -- GitLab