Skip to content
Snippets Groups Projects
Commit 667acc1d authored by Ramin Yaghoubzadeh's avatar Ramin Yaghoubzadeh
Browse files

Links implemented and tested superficially (local and remote access)

parent 6739b970
No related branches found
No related tags found
No related merge requests found
...@@ -8,9 +8,12 @@ def remote_change_dumper(iu, event_type, local): ...@@ -8,9 +8,12 @@ def remote_change_dumper(iu, event_type, local):
print 'remote side '+event_type+': '+str(iu) print 'remote side '+event_type+': '+str(iu)
ob = ipaaca.OutputBuffer('CoolComponent') ob = ipaaca.OutputBuffer('CoolInformerOut')
ob.register_handler(remote_change_dumper) ob.register_handler(remote_change_dumper)
iu_top = ipaaca.IU()
iu_top.payload = {'data': 'raw'}
ob.add(iu_top)
iu = ipaaca.IU() iu = ipaaca.IU()
iu.payload = {'a':'a1'} iu.payload = {'a':'a1'}
...@@ -20,6 +23,7 @@ iu.payload = {'a':'a2', 'b':'b1'} #OK ...@@ -20,6 +23,7 @@ iu.payload = {'a':'a2', 'b':'b1'} #OK
del(iu.payload['b']) del(iu.payload['b'])
iu.payload['c'] = 'c1' iu.payload['c'] = 'c1'
iu.payload['a'] = 'a3' iu.payload['a'] = 'a3'
iu.add_links('sameold', iu_top.uid)
time.sleep(1) time.sleep(1)
iu.commit() iu.commit()
......
...@@ -8,12 +8,22 @@ import sys ...@@ -8,12 +8,22 @@ import sys
import threading import threading
import uuid import uuid
import collections import collections
import copy
import rsb import rsb
import rsb.converter import rsb.converter
import ipaaca_pb2 import ipaaca_pb2
# IDEAS
# We should think about relaying the update event (or at least the
# affected keys in the payload / links) to the event handlers!
# THOUGHTS
# Output buffers could generate UIDs for IUs on request, without
# publishing them at that time. Then UID could then be used
# for internal links etc. The IU may be published later through
# the same buffer that allocated the UID.
__all__ = [ __all__ = [
'IUEventType', 'IUEventType',
...@@ -110,27 +120,26 @@ class IUReadOnlyError(Exception): ...@@ -110,27 +120,26 @@ class IUReadOnlyError(Exception):
## --- Generation Architecture ----------------------------------------------- ## --- Generation Architecture -----------------------------------------------
#class Links(object):
class Links(object): # ''' This is essentially a dict STR -> set([STR, ...]) '''
''' This is essentially a dict STR -> set([STR, ...]) ''' # def __init__(self, iu, writer_name=None, new_links=None):
def __init__(self, iu, writer_name=None, new_links=None): # nl = {} if new_links is None else new_links
nl = {} if new_links is None else new_links # self.iu = iu
self.iu = iu # self.iu._set_links(links=self, is_delta=False, new_links=pl, links_to_remove=[], writer_name=writer_name)
self.iu._set_links(links=self, is_delta=False, new_links=pl, links_to_remove=[], writer_name=writer_name) # for k, v in pl.items():
for k, v in pl.items(): # dict.__setitem__(self, k, v)
dict.__setitem__(self, k, v) # def add_links(self, type, targets, writer_name=None):
def add_links(self, type, targets, writer_name=None): # if not hasattr(targets, '__iter__'): targets=[targets]
if not hasattr(targets, '__iter__'): targets=[targets] # self.iu._set_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name)
self.iu._set_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name) # def remove_links(self, type, targets, writer_name=None):
def remove_links(self, type, targets, writer_name=None): # if not hasattr(targets, '__iter__'): targets=[targets]
if not hasattr(targets, '__iter__'): targets=[targets] # self.iu._set_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name)
self.iu._set_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name) # def modify_links(self, add, remove, writer_name=None):
def modify_links(self, add, remove, writer_name=None): # self.iu._set_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name)
self.iu._set_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name) # def set_links(self, links, writer_name=None):
def set_links(self, links, writer_name=None): # self.iu._set_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name)
self.iu._set_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name) # #def get_links(self, type):
def get_links(self, type): # # return set(self.iu._get_links())
return set(self.iu._get_links())
class Payload(dict): class Payload(dict):
def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False): def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False):
...@@ -181,27 +190,60 @@ class IUInterface(object): #{{{ ...@@ -181,27 +190,60 @@ class IUInterface(object): #{{{
# payload is not present here # payload is not present here
self._links = collections.defaultdict(set) self._links = collections.defaultdict(set)
def __str__(self):
s = str(self.__class__)+"{ "
s += "uid="+self._uid+" "
s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") "
s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " "
s += "payload={ "
for k,v in self.payload.items():
s += k+":'"+v+"', "
s += "} "
s += "links={ "
for t,ids in self.get_all_links().items():
s += t+":'"+str(ids)+"', "
s += "} "
s += "}"
return s
def _add_and_remove_links(self, add, remove): def _add_and_remove_links(self, add, remove):
for type in remove.keys(): self._links[type] -= remove[type] '''Just add and remove the new links in our links set, do not send an update here'''
for type in add.keys(): self._links[type] |= add[type] '''Note: Also used for remotely enforced links updates.'''
for type in remove.keys(): self._links[type] -= set(remove[type])
for type in add.keys(): self._links[type] |= set(add[type])
def _replace_links(self, links):
'''Just wipe and replace our links set, do not send an update here'''
'''Note: Also used for remotely enforced links updates.'''
self._links = {}
for type in links.keys(): self._links[type] |= set(links[type])
def add_links(self, type, targets, writer_name=None): def add_links(self, type, targets, writer_name=None):
'''Attempt to add links if the conditions are met
and send an update message. Then call the local setter.'''
if not hasattr(targets, '__iter__'): targets=[targets] if not hasattr(targets, '__iter__'): targets=[targets]
self._modify_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name) self._modify_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name)
self._add_and_remove_links( add={type:targets}, remove={} ) self._add_and_remove_links( add={type:targets}, remove={} )
def remove_links(self, type, targets, writer_name=None): def remove_links(self, type, targets, writer_name=None):
'''Attempt to remove links if the conditions are met
and send an update message. Then call the local setter.'''
if not hasattr(targets, '__iter__'): targets=[targets] if not hasattr(targets, '__iter__'): targets=[targets]
self._modify_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name) self._modify_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name)
self._add_and_remove_links( add={}, remove={type:targets} ) self._add_and_remove_links( add={}, remove={type:targets} )
def modify_links(self, add, remove, writer_name=None): def modify_links(self, add, remove, writer_name=None):
'''Attempt to modify links if the conditions are met
and send an update message. Then call the local setter.'''
self._modify_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name) self._modify_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name)
self._add_and_remove_links( add=add, remove=remove ) self._add_and_remove_links( add=add, remove=remove )
def set_links(self, links, writer_name=None): def set_links(self, links, writer_name=None):
'''Attempt to set (replace) links if the conditions are met
and send an update message. Then call the local setter.'''
self._modify_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name) self._modify_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name)
self._links = {} self._replace_links( links=new_links )
self._add_and_remove_links( add=new_links, remove={} )
def get_links(self, type): def get_links(self, type):
return set(self._links[type]) return set(self._links[type])
def get_all_links(self):
return copy.deepcopy(self._links)
def _get_revision(self): def _get_revision(self):
return self._revision return self._revision
...@@ -319,18 +361,6 @@ class IU(IUInterface):#{{{ ...@@ -319,18 +361,6 @@ class IU(IUInterface):#{{{
"""Commit to this IU.""" """Commit to this IU."""
return self._internal_commit() return self._internal_commit()
def __str__(self):
s = "IU{ "
s += "uid="+self._uid+" "
s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") "
s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " "
s += "payload={ "
for k,v in self.payload.items():
s += k+":'"+v+"', "
s += "} "
s += "}"
return s
def _get_payload(self): def _get_payload(self):
return self._payload return self._payload
def _set_payload(self, new_pl, writer_name=None): def _set_payload(self, new_pl, writer_name=None):
...@@ -391,6 +421,26 @@ class RemotePushIU(IUInterface):#{{{ ...@@ -391,6 +421,26 @@ class RemotePushIU(IUInterface):#{{{
# We are just receiving it here and applying the new data. # We are just receiving it here and applying the new data.
self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True) self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True)
def _modify_links(self, links, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
"""Modify the links: add or remove item from this payload remotely and send update."""
if self.committed:
raise IUCommittedError(self)
if self.read_only:
raise IUReadOnlyError(self)
requested_update = IULinkUpdate(
uid=self.uid,
revision=self.revision,
is_delta=is_delta,
writer_name=self.buffer.unique_name,
new_links=new_links,
links_to_remove=links_to_remove)
remote_server = self.buffer._get_remote_server(self)
new_revision = remote_server.updateLinks(requested_update)
if new_revision == 0:
raise IUUpdateFailedError(self)
else:
self._revision = new_revision
def _modify_payload(self, payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): def _modify_payload(self, payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
"""Modify the payload: add or remove item from this payload remotely and send update.""" """Modify the payload: add or remove item from this payload remotely and send update."""
if self.committed: if self.committed:
...@@ -411,18 +461,6 @@ class RemotePushIU(IUInterface):#{{{ ...@@ -411,18 +461,6 @@ class RemotePushIU(IUInterface):#{{{
else: else:
self._revision = new_revision self._revision = new_revision
def __str__(self):
s = "RemotePushIU{ "
s += "uid="+self._uid+" "
s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") "
s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " "
s += "payload={ "
for k,v in self.payload.items():
s += k+":'"+v+"', "
s += "} "
s += "}"
return s
def commit(self): def commit(self):
"""Commit to this IU.""" """Commit to this IU."""
if self.read_only: if self.read_only:
...@@ -470,6 +508,14 @@ class RemotePushIU(IUInterface):#{{{ ...@@ -470,6 +508,14 @@ class RemotePushIU(IUInterface):#{{{
fset=_set_payload, fset=_set_payload,
doc='Payload dictionary of the IU.') doc='Payload dictionary of the IU.')
def _apply_link_update(self, update):
"""Apply a IULinkUpdate to the IU."""
self._revision = update.revision
if update.is_delta:
self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove)
else:
self._replace_links(links=update.new_links)
def _apply_update(self, update): def _apply_update(self, update):
"""Apply a IUPayloadUpdate to the IU.""" """Apply a IUPayloadUpdate to the IU."""
self._revision = update.revision self._revision = update.revision
...@@ -566,7 +612,7 @@ class IULinkUpdate(object):#{{{ ...@@ -566,7 +612,7 @@ class IULinkUpdate(object):#{{{
self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove) self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove)
def __str__(self): def __str__(self):
s = 'PayloadUpdate(' + 'uid=' + self.uid + ', ' s = 'LinkUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', ' s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', ' s += 'writer_name='+str(self.writer_name)+', '
s += 'is_delta='+str(self.is_delta)+', ' s += 'is_delta='+str(self.is_delta)+', '
...@@ -874,6 +920,7 @@ class OutputBuffer(Buffer): ...@@ -874,6 +920,7 @@ class OutputBuffer(Buffer):
super(OutputBuffer, self).__init__(owning_component_name, participant_config) super(OutputBuffer, self).__init__(owning_component_name, participant_config)
self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB' self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB'
self._server = rsb.createServer(rsb.Scope(self._unique_name)) self._server = rsb.createServer(rsb.Scope(self._unique_name))
self._server.addMethod('updateLinks', self._remote_update_links, IULinkUpdate, int)
self._server.addMethod('updatePayload', self._remote_update_payload, IUPayloadUpdate, int) self._server.addMethod('updatePayload', self._remote_update_payload, IUPayloadUpdate, int)
self._server.addMethod('commit', self._remote_commit, ipaaca_pb2.IUCommission, int) self._server.addMethod('commit', self._remote_commit, ipaaca_pb2.IUCommission, int)
self._informer_store = {} self._informer_store = {}
...@@ -899,8 +946,26 @@ class OutputBuffer(Buffer): ...@@ -899,8 +946,26 @@ class OutputBuffer(Buffer):
number = self.__iu_id_counter number = self.__iu_id_counter
return self._id_prefix + str(number) return self._id_prefix + str(number)
def _remote_update_links(self, update):
'''Apply a remotely requested update to one of the stored IU's links.'''
if update.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
return 0
iu = self._iu_store[update.uid]
with iu.revision_lock:
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
return 0
if update.is_delta:
iu.modify_links(add=update.new_links, remove=update.links_to_remove, writer_name=update.writer_name)
else:
iu.set_links(links=update.new_links, writer_name=update.writer_name)
self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.LINKSUPDATED, category=iu.category)
return iu.revision
def _remote_update_payload(self, update): def _remote_update_payload(self, update):
'''Apply a remotely requested update to one of the stored IUs.''' '''Apply a remotely requested update to one of the stored IU's payload.'''
if update.uid not in self._iu_store: if update.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid)) logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
return 0 return 0
......
...@@ -11,9 +11,13 @@ def my_update_handler(iu, event_type, local): ...@@ -11,9 +11,13 @@ def my_update_handler(iu, event_type, local):
print(event_type+': '+str(iu)) print(event_type+': '+str(iu))
iu_to_write = iu iu_to_write = iu
ob = ipaaca.OutputBuffer('CoolListenerOut')
my_iu = ipaaca.IU()
my_iu.payload = {'some':'info'}
ob.add(my_iu)
ib = ipaaca.InputBuffer('CoolReceiver', ['undef']) ib = ipaaca.InputBuffer('CoolListenerIn', ['undef'])
ib.register_handler(my_update_handler) ib.register_handler(my_update_handler)
counter = 0 counter = 0
...@@ -25,7 +29,9 @@ while True: ...@@ -25,7 +29,9 @@ while True:
iu = iu_to_write iu = iu_to_write
#if counter == 1: #if counter == 1:
# iu.payload['a'] = 'remote' # iu.payload['a'] = 'remote'
if counter % 3 == 1: if counter == 10:
iu.add_links('special', my_iu.uid)
elif counter % 3 == 1:
iu.payload['a'] = 'REMOTELY SET '+str(counter) iu.payload['a'] = 'REMOTELY SET '+str(counter)
elif counter % 3 == 2: elif counter % 3 == 2:
del iu.payload['a'] del iu.payload['a']
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment