Newer
Older
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function, division
import logging
import sys
import threading
import uuid

Ramin Yaghoubzadeh
committed
import collections
import copy
import rsb

Ramin Yaghoubzadeh
committed
import rsb.converter
import ipaaca_pb2
# IDEAS
# We should think about relaying the update event (or at least the
# affected keys in the payload / links) to the event handlers!
# THOUGHTS
# Output buffers could generate UIDs for IUs on request, without
# publishing them at that time. Then UID could then be used
# for internal links etc. The IU may be published later through
# the same buffer that allocated the UID.
__all__ = [
'IUEventType',
'IUAccessMode',
'InputBuffer', 'OutputBuffer',
'IU',
'IUPublishedError', 'IUUpdateFailedError', 'IUCommittedError', 'IUReadOnlyError',
'logger'
]
## --- Utilities -------------------------------------------------------------
def enum(*sequential, **named):
"""Create an enum type.
Based on suggestion of Alec Thomas on stackoverflow.com:
http://stackoverflow.com/questions/36932/
whats-the-best-way-to-implement-an-enum-in-python/1695250#1695250
"""
enums = dict(zip(sequential, range(len(sequential))), **named)
return type('Enum', (), enums)
def pack_typed_payload_item(protobuf_object, key, value):
protobuf_object.key = str(key)
protobuf_object.value = str(value)

Ramin Yaghoubzadeh
committed
protobuf_object.type = 'str' # TODO: more types
def unpack_typed_payload_item(protobuf_object):
# TODO: more types
return (protobuf_object.key, str(protobuf_object.value))
class IpaacaLoggingHandler(logging.Handler):
def __init__(self, level=logging.DEBUG):
logging.Handler.__init__(self, level)
def emit(self, record):
meta = '[ipaaca] (' + str(record.levelname) + ') '
msg = str(record.msg.format(record.args))
print(meta + msg)
## --- Global Definitions ----------------------------------------------------
IUEventType = enum(
ADDED = 'ADDED',
COMMITTED = 'COMMITTED',
DELETED = 'DELETED',
RETRACTED = 'RETRACTED',

Ramin Yaghoubzadeh
committed
UPDATED = 'UPDATED',
LINKSUPDATED = 'LINKSUPDATED'
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
)
IUAccessMode = enum(
"PUSH",
"REMOTE",
"MESSAGE"
)
## --- Errors and Exceptions -------------------------------------------------
class IUPublishedError(Exception):
"""Error publishing of an IU failed since it is already in the buffer."""
def __init__(self, iu):
super(IUPublishedError, self).__init__('IU ' + str(iu.uid) + ' is already present in the output buffer.')
class IUUpdateFailedError(Exception):
"""Error indicating that a remote IU update failed."""
def __init__(self, iu):
super(IUUpdateFailedError, self).__init__('Remote update failed for IU ' + str(iu.uid) + '.')
class IUCommittedError(Exception):
"""Error indicating that an IU is immutable because it has been committed to."""
def __init__(self, iu):
super(IUCommittedError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it has been committed to.')
class IUReadOnlyError(Exception):
"""Error indicating that an IU is immutable because it is 'read only'."""
def __init__(self, iu):
super(IUReadOnlyError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it is read-only.')
## --- Generation Architecture -----------------------------------------------
#class Links(object):
# ''' This is essentially a dict STR -> set([STR, ...]) '''
# def __init__(self, iu, writer_name=None, new_links=None):
# nl = {} if new_links is None else new_links
# self.iu = iu
# self.iu._set_links(links=self, is_delta=False, new_links=pl, links_to_remove=[], writer_name=writer_name)
# for k, v in pl.items():
# dict.__setitem__(self, k, v)
# def add_links(self, type, targets, writer_name=None):
# if not hasattr(targets, '__iter__'): targets=[targets]
# self.iu._set_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name)
# def remove_links(self, type, targets, writer_name=None):
# if not hasattr(targets, '__iter__'): targets=[targets]
# self.iu._set_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name)
# def modify_links(self, add, remove, writer_name=None):
# self.iu._set_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name)
# def set_links(self, links, writer_name=None):
# self.iu._set_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name)
# #def get_links(self, type):
# # return set(self.iu._get_links())

Ramin Yaghoubzadeh
committed
class Payload(dict):

Ramin Yaghoubzadeh
committed
def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False):
pl = {} if new_payload is None else new_payload
self.iu = iu

Ramin Yaghoubzadeh
committed
# NOTE omit_init_update_message is necessary to prevent checking for
# exceptions and sending updates in the case where we just receive
# a whole new payload from the remote side and overwrite it locally.
if (not omit_init_update_message) and (self.iu.buffer is not None):
self.iu._modify_payload(payload=self, is_delta=False, new_items=pl, keys_to_remove=[], writer_name=writer_name)
for k, v in pl.items():
dict.__setitem__(self, k, v)
def __setitem__(self, k, v, writer_name=None):

Ramin Yaghoubzadeh
committed
self.iu._modify_payload(payload=self, is_delta=True, new_items={k:v}, keys_to_remove=[], writer_name=writer_name)
result = dict.__setitem__(self, k, v)
def __delitem__(self, k, writer_name=None):

Ramin Yaghoubzadeh
committed
self.iu._modify_payload(payload=self, is_delta=True, new_items={}, keys_to_remove=[k], writer_name=writer_name)
result = dict.__delitem__(self, k)

Ramin Yaghoubzadeh
committed
def _remotely_enforced_setitem(self, k, v):
"""Sets an item when requested remotely."""
return dict.__setitem__(self, k, v)
def _remotely_enforced_delitem(self, k):
"""Deletes an item when requested remotely."""
return dict.__delitem__(self, k)

Ramin Yaghoubzadeh
committed
class IUInterface(object): #{{{
"""Base class of all specialised IU classes."""
def __init__(self, uid, access_mode=IUAccessMode.PUSH, read_only=False):
"""Creates an IU.
Keyword arguments:
uid -- unique ID of this IU
access_mode -- access mode of this IU
read_only -- flag indicating whether this IU is read_only or not
"""
self._uid = uid
self._revision = None
self._category = None
self._payload_type = None
self._owner_name = None
self._committed = False
self._access_mode = access_mode
self._read_only = read_only
self._buffer = None
# payload is not present here

Ramin Yaghoubzadeh
committed
self._links = collections.defaultdict(set)
def __str__(self):
s = str(self.__class__)+"{ "
s += "uid="+self._uid+" "
s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") "
s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " "
s += "payload={ "
for k,v in self.payload.items():
s += k+":'"+v+"', "
s += "} "
s += "links={ "
for t,ids in self.get_all_links().items():
s += t+":'"+str(ids)+"', "
s += "} "
s += "}"
return s

Ramin Yaghoubzadeh
committed
def _add_and_remove_links(self, add, remove):
'''Just add and remove the new links in our links set, do not send an update here'''
'''Note: Also used for remotely enforced links updates.'''
for type in remove.keys(): self._links[type] -= set(remove[type])
for type in add.keys(): self._links[type] |= set(add[type])
def _replace_links(self, links):
'''Just wipe and replace our links set, do not send an update here'''
'''Note: Also used for remotely enforced links updates.'''
self._links = {}
for type in links.keys(): self._links[type] |= set(links[type])

Ramin Yaghoubzadeh
committed
def add_links(self, type, targets, writer_name=None):
'''Attempt to add links if the conditions are met
and send an update message. Then call the local setter.'''

Ramin Yaghoubzadeh
committed
if not hasattr(targets, '__iter__'): targets=[targets]
self._modify_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name)
self._add_and_remove_links( add={type:targets}, remove={} )
def remove_links(self, type, targets, writer_name=None):
'''Attempt to remove links if the conditions are met
and send an update message. Then call the local setter.'''

Ramin Yaghoubzadeh
committed
if not hasattr(targets, '__iter__'): targets=[targets]
self._modify_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name)
self._add_and_remove_links( add={}, remove={type:targets} )
def modify_links(self, add, remove, writer_name=None):
'''Attempt to modify links if the conditions are met
and send an update message. Then call the local setter.'''

Ramin Yaghoubzadeh
committed
self._modify_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name)
self._add_and_remove_links( add=add, remove=remove )
def set_links(self, links, writer_name=None):
'''Attempt to set (replace) links if the conditions are met
and send an update message. Then call the local setter.'''

Ramin Yaghoubzadeh
committed
self._modify_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name)
self._replace_links( links=new_links )

Ramin Yaghoubzadeh
committed
def get_links(self, type):
return set(self._links[type])
def get_all_links(self):
return copy.deepcopy(self._links)
def _get_revision(self):
return self._revision
revision = property(fget=_get_revision, doc='Revision number of the IU.')
def _get_category(self):
return self._category
category = property(fget=_get_category, doc='Category of the IU.')
def _get_payload_type(self):
return self._payload_type
payload_type = property(fget=_get_payload_type, doc='Type of the IU payload')
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def _get_committed(self):
return self._committed
committed = property(
fget=_get_committed,
doc='Flag indicating whether this IU has been committed to.')
def _get_uid(self):
return self._uid
uid = property(fget=_get_uid, doc='Unique ID of the IU.')
def _get_access_mode(self):
return self._access_mode
access_mode = property(fget=_get_access_mode, doc='Access mode of the IU.')
def _get_read_only(self):
return self._read_only
read_only = property(
fget=_get_read_only,
doc='Flag indicating whether this IU is read only.')
def _get_buffer(self):
return self._buffer
def _set_buffer(self, buffer):
if self._buffer is not None:
raise Exception('The IU is already in a buffer, cannot move it.')
self._buffer = buffer
buffer = property(
fget=_get_buffer,
fset=_set_buffer,
doc='Buffer this IU is held in.')
def _get_owner_name(self):
return self._owner_name
def _set_owner_name(self, owner_name):
if self._owner_name is not None:
raise Exception('The IU already has an owner name, cannot change it.')
self._owner_name = owner_name
owner_name = property(
fget=_get_owner_name,
fset=_set_owner_name,
doc="The IU's owner's name.")
#}}}
class IU(IUInterface):#{{{
"""A local IU."""
def __init__(self, access_mode=IUAccessMode.PUSH, read_only=False, category='undef', _payload_type='MAP'):
super(IU, self).__init__(uid=None, access_mode=access_mode, read_only=read_only)
self._revision = 1
self._category = category
self._payload_type = _payload_type

Ramin Yaghoubzadeh
committed
self.revision_lock = threading.RLock()
self._payload = Payload(iu=self)

Ramin Yaghoubzadeh
committed
def _modify_links(self, links, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
if self.committed:
raise IUCommittedError(self)
with self.revision_lock:
# modify links locally
self._increase_revision_number()
if self.is_published:
# send update to remote holders
self.buffer._send_iu_link_update(
self,
revision=self.revision,
is_delta=is_delta,
new_links=new_links,
links_to_remove=links_to_remove,
writer_name=self.owner_name if writer_name is None else writer_name)
def _modify_payload(self, payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
"""Modify the payload: add or remove items from this payload locally and send update."""
if self.committed:
raise IUCommittedError(self)
with self.revision_lock:
# set item locally
self._increase_revision_number()
if self.is_published:
# send update to remote holders
self.buffer._send_iu_payload_update(
self,
revision=self.revision,

Ramin Yaghoubzadeh
committed
is_delta=is_delta,
new_items=new_items,
keys_to_remove=keys_to_remove,
writer_name=self.owner_name if writer_name is None else writer_name)
def _increase_revision_number(self):
self._revision += 1
def _internal_commit(self, writer_name=None):
if self.committed:
raise IUCommittedError(self)
with self.revision_lock:
if not self._committed:
self._increase_revision_number()
self._committed = True
self.buffer._send_iu_commission(self, writer_name=writer_name)
def commit(self):
"""Commit to this IU."""
return self._internal_commit()
def _get_payload(self):
return self._payload
def _set_payload(self, new_pl, writer_name=None):
if self.committed:
raise IUCommittedError(self)
with self.revision_lock:
self._increase_revision_number()
self._payload = Payload(
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
iu=self,
writer_name=None if self.buffer is None else (self.buffer.unique_name if writer_name is None else writer_name),
new_payload=new_pl)
payload = property(
fget=_get_payload,
fset=_set_payload,
doc='Payload dictionary of this IU.')
def _get_is_published(self):
return self.buffer is not None
is_published = property(
fget=_get_is_published,
doc='Flag indicating whether this IU has been published or not.')
def _set_buffer(self, buffer):
if self._buffer is not None:
raise Exception('The IU is already in a buffer, cannot move it.')
self._buffer = buffer
self.owner_name = buffer.unique_name
self._payload.owner_name = buffer.unique_name
buffer = property(
fget=IUInterface._get_buffer,
fset=_set_buffer,
doc='Buffer this IU is held in.')
def _set_uid(self, uid):
if self._uid is not None:
raise AttributeError('The uid of IU ' + self.uid + ' has already been set, cannot change it.')
self._uid = uid
uid = property(
fget=IUInterface._get_uid,
fset=_set_uid,
doc='Unique ID of theIU.')
#}}}
class RemotePushIU(IUInterface):#{{{
"""A remote IU with access mode 'PUSH'."""
def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload):
super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only)
self._revision = revision
self._category = category
self.owner_name = owner_name
self._payload_type = payload_type
self._committed = committed

Ramin Yaghoubzadeh
committed
# NOTE Since the payload is an already-existant Payload which we didn't modify ourselves,
# don't try to invoke any modification checks or network updates ourselves either.
# We are just receiving it here and applying the new data.
self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True)
def _modify_links(self, links, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
"""Modify the links: add or remove item from this payload remotely and send update."""
if self.committed:
raise IUCommittedError(self)
if self.read_only:
raise IUReadOnlyError(self)
requested_update = IULinkUpdate(
uid=self.uid,
revision=self.revision,
is_delta=is_delta,
writer_name=self.buffer.unique_name,
new_links=new_links,
links_to_remove=links_to_remove)
remote_server = self.buffer._get_remote_server(self)
new_revision = remote_server.updateLinks(requested_update)
if new_revision == 0:
raise IUUpdateFailedError(self)
else:
self._revision = new_revision

Ramin Yaghoubzadeh
committed
def _modify_payload(self, payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
"""Modify the payload: add or remove item from this payload remotely and send update."""
if self.committed:
raise IUCommittedError(self)
if self.read_only:
raise IUReadOnlyError(self)
requested_update = IUPayloadUpdate(
uid=self.uid,
revision=self.revision,
is_delta=is_delta,
writer_name=self.buffer.unique_name,
new_items=new_items,
keys_to_remove=keys_to_remove)
remote_server = self.buffer._get_remote_server(self)
new_revision = remote_server.updatePayload(requested_update)
if new_revision == 0:
raise IUUpdateFailedError(self)
else:
self._revision = new_revision
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
def commit(self):
"""Commit to this IU."""
if self.read_only:
raise IUReadOnlyError(self)
if self._committed:
# ignore commit requests when already committed
return
else:
commission_request = ipaaca_pb2.IUCommission()
commission_request.uid = self.uid
commission_request.revision = self.revision
commission_request.writer_name = self.buffer.unique_name
remote_server = self.buffer._get_remote_server(self)
new_revision = remote_server.commit(commission_request)
if new_revision == 0:
raise IUUpdateFailedError(self)
else:
self._revision = new_revision
self._committed = True
def _get_payload(self):
return self._payload
def _set_payload(self, new_pl):
if self.committed:
raise IUCommittedError(self)
if self.read_only:
raise IUReadOnlyError(self)
requested_update = IUPayloadUpdate(
uid=self.uid,
revision=self.revision,
is_delta=False,
writer_name=self.buffer.unique_name,
new_items=new_pl,
keys_to_remove=[])
remote_server = self.buffer._get_remote_server(self)
new_revision = remote_server.updatePayload(requested_update)
if new_revision == 0:
raise IUUpdateFailedError(self)
else:
self._revision = new_revision

Ramin Yaghoubzadeh
committed
# NOTE Please read the comment in the constructor
self._payload = Payload(iu=self, new_payload=new_pl, omit_init_update_message=True)
payload = property(
fget=_get_payload,
fset=_set_payload,
doc='Payload dictionary of the IU.')
def _apply_link_update(self, update):
"""Apply a IULinkUpdate to the IU."""
self._revision = update.revision
if update.is_delta:
self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove)
else:
self._replace_links(links=update.new_links)
def _apply_update(self, update):
"""Apply a IUPayloadUpdate to the IU."""
self._revision = update.revision
if update.is_delta:
for k in update.keys_to_remove: self.payload._remotely_enforced_delitem(k)
for k, v in update.new_items.items(): self.payload._remotely_enforced_setitem(k, v)
else:

Ramin Yaghoubzadeh
committed
# NOTE Please read the comment in the constructor
self._payload = Payload(iu=self, new_payload=update.new_items, omit_init_update_message=True)
def _apply_commission(self):
"""Apply commission to the IU"""
self._committed = True
#}}}

Ramin Yaghoubzadeh
committed
class IntConverter(rsb.converter.Converter):#{{{
"""Convert Python int objects to Protobuf ints and vice versa."""
def __init__(self, wireSchema="int", dataType=int):
super(IntConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, value):
pbo = ipaaca_pb2.IntMessage()
pbo.value = value
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca_pb2.IntMessage()

Ramin Yaghoubzadeh
committed
pbo.ParseFromString( str(byte_stream) )
return pbo.value
#}}}

Ramin Yaghoubzadeh
committed
class IUConverter(rsb.converter.Converter):#{{{
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-iu", dataType=IU):
super(IUConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu):
pbo = ipaaca_pb2.IU()
pbo.uid = iu._uid
pbo.revision = iu._revision
pbo.category = iu._category
pbo.payload_type = iu._payload_type
pbo.owner_name = iu._owner_name
pbo.committed = iu._committed
pbo.access_mode = ipaaca_pb2.IU.PUSH # TODO
pbo.read_only = iu._read_only
for k,v in iu._payload.items():
entry = pbo.payload.add()
pack_typed_payload_item(entry, k, v)
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
type = self.getDataType()
if type == IU:
pbo = ipaaca_pb2.IU()

Ramin Yaghoubzadeh
committed
pbo.ParseFromString( str(byte_stream) )
if pbo.access_mode == ipaaca_pb2.IU.PUSH:
_payload = {}
for entry in pbo.payload:
k, v = unpack_typed_payload_item(entry)
_payload[k] = v
remote_push_iu = RemotePushIU(
uid=pbo.uid,
revision=pbo.revision,
read_only = pbo.read_only,
owner_name = pbo.owner_name,
category = pbo.category,
payload_type = pbo.payload_type,
committed = pbo.committed,
payload=_payload
)
return remote_push_iu
else:
raise Exception("We can only handle IUs with access mode 'PUSH' for now!")
else:
raise ValueError("Inacceptable dataType %s" % type)
#}}}

Ramin Yaghoubzadeh
committed
class IULinkUpdate(object):#{{{
def __init__(self, uid, revision, is_delta, writer_name="undef", new_links=None, links_to_remove=None):
super(IULinkUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.writer_name = writer_name
self.is_delta = is_delta
self.new_links = collections.defaultdict(set) if new_links is None else collections.defaultdict(set, new_links)
self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove)
def __str__(self):
s = 'LinkUpdate(' + 'uid=' + self.uid + ', '

Ramin Yaghoubzadeh
committed
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_links = '+str(self.new_links)+', '
s += 'links_to_remove = '+str(self.links_to_remove)+')'
return s
#}}}
class IUPayloadUpdate(object):#{{{
def __init__(self, uid, revision, is_delta, writer_name="undef", new_items=None, keys_to_remove=None):
super(IUPayloadUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.writer_name = writer_name
self.is_delta = is_delta
self.new_items = {} if new_items is None else new_items
self.keys_to_remove = [] if keys_to_remove is None else keys_to_remove
def __str__(self):
s = 'PayloadUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_items = '+str(self.new_items)+', '
s += 'keys_to_remove = '+str(self.keys_to_remove)+')'
return s
#}}}

Ramin Yaghoubzadeh
committed
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
class IULinkUpdateConverter(rsb.converter.Converter):#{{{
def __init__(self, wireSchema="ipaaca-iu-link-update", dataType=IULinkUpdate):
super(IULinkUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu_link_update):
pbo = ipaaca_pb2.IULinkUpdate()
pbo.uid = iu_link_update.uid
pbo.writer_name = iu_link_update.writer_name
pbo.revision = iu_link_update.revision
for type_ in iu_link_update.new_links.keys():
linkset = pbo.new_links.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.new_links[type_])
for type_ in iu_link_update.links_to_remove.keys():
linkset = pbo.links_to_remove.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.links_to_remove[type_])
pbo.is_delta = iu_link_update.is_delta
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
type = self.getDataType()
if type == IULinkUpdate:
pbo = ipaaca_pb2.IULinkUpdate()
pbo.ParseFromString( str(byte_stream) )
logger.debug('received an IULinkUpdate for revision '+str(pbo.revision))
iu_link_up = IULinkUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_links:
iu_link_up.new_links[str(entry.type)] = set(entry.targets)
for entry in pbo.links_to_remove:
iu_link_up.links_to_remove[str(entry.type)] = set(entry.targets)
return iu_link_up
else:
raise ValueError("Inacceptable dataType %s" % type)
#}}}

Ramin Yaghoubzadeh
committed
class IUPayloadUpdateConverter(rsb.converter.Converter):#{{{
def __init__(self, wireSchema="ipaaca-iu-payload-update", dataType=IUPayloadUpdate):
super(IUPayloadUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu_payload_update):
pbo = ipaaca_pb2.IUPayloadUpdate()
pbo.uid = iu_payload_update.uid
pbo.writer_name = iu_payload_update.writer_name
pbo.revision = iu_payload_update.revision
for k,v in iu_payload_update.new_items.items():
entry = pbo.new_items.add()
pack_typed_payload_item(entry, k, v)
pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove)
pbo.is_delta = iu_payload_update.is_delta
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
type = self.getDataType()
if type == IUPayloadUpdate:
pbo = ipaaca_pb2.IUPayloadUpdate()

Ramin Yaghoubzadeh
committed
pbo.ParseFromString( str(byte_stream) )
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
logger.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_items:
k, v = unpack_typed_payload_item(entry)
iu_up.new_items[k] = v
iu_up.keys_to_remove = pbo.keys_to_remove[:]
return iu_up
else:
raise ValueError("Inacceptable dataType %s" % type)
#}}}
class IUStore(dict):
"""A dictionary storing IUs."""
def __init__(self):
super(IUStore, self).__init__()
class IUEventHandler(object):
"""Wrapper for IU event handling functions."""
def __init__(self, handler_function, for_event_types=None, for_categories=None):
"""Create an IUEventHandler.
Keyword arguments:
handler_function -- the handler function with the signature
(IU, event_type, local)
for_event_types -- a list of event types or None if handler should
be called for all event types
for_categories -- a list of category names or None if handler should
be called for all categoires
"""
super(IUEventHandler, self).__init__()
self._handler_function = handler_function
self._for_event_types = (
None if for_event_types is None else
(for_event_types[:] if hasattr(for_event_types, '__iter__') else [for_event_types]))
self._for_categories = (
None if for_categories is None else
(for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories]))
def condition_met(self, event_type, category):
"""Check whether this IUEventHandler should be called.
Keyword arguments:
event_type -- type of the IU event
category -- category of the IU which triggered the event
"""
type_condition_met = (self._for_event_types is None or event_type in self._for_event_types)
cat_condition_met = (self._for_categories is None or category in self._for_categories)
return type_condition_met and cat_condition_met
def call(self, buffer, iu_uid, local, event_type, category):
"""Call this IUEventHandler's function, if it applies.
Keyword arguments:
buffer -- the buffer in which the IU is stored
iu_uid -- the uid of the IU
local -- is the IU local or remote to this component? @RAMIN: Is this correct?
event_type -- IU event type
category -- category of the IU
"""
if self.condition_met(event_type, category):
iu = buffer._iu_store[iu_uid]
self._handler_function(iu, event_type, local)
class Buffer(object):
"""Base class for InputBuffer and OutputBuffer."""
def __init__(self, owning_component_name, participant_config=None):
'''Create a Buffer.
Keyword arguments:
owning_compontent_name -- name of the entity that owns this Buffer
participant_config -- RSB configuration
'''
super(Buffer, self).__init__()
self._owning_component_name = owning_component_name
self._participant_config = participant_config #rsb.ParticipantConfig.fromDefaultSources() if participant_config is None else participant_config
self._uuid = str(uuid.uuid4())[0:8]
# Initialise with a temporary, but already unique, name
self._unique_name = "undef-"+self._uuid
self._iu_store = IUStore()
self._iu_event_handlers = []
def register_handler(self, handler_function, for_event_types=None, for_categories=None):
"""Register a new IU event handler function.
Keyword arguments:
handler_function -- a function with the signature (IU, event_type, local)
for_event_types -- a list of event types or None if handler should
be called for all event types
for_categories -- a list of category names or None if handler should
be called for all categoires
"""
handler = IUEventHandler(handler_function=handler_function, for_event_types=for_event_types, for_categories=for_categories)
self._iu_event_handlers.append(handler)
def call_iu_event_handlers(self, uid, local, event_type, category):
"""Call registered IU event handler functions registered for this event_type and category."""
for h in self._iu_event_handlers:
# print('calling an update handler for '+event_type+' -> '+str(h))
h.call(self, uid, local=local, event_type=event_type, category=category)
def _get_owning_component_name(self):
"""Return the name of this Buffer's owning component"""
return self._owning_component_name
owning_component_name = property(_get_owning_component_name)
def _get_unique_name(self):
"""Return the Buffer's unique name."""
return self._unique_name
unique_name = property(_get_unique_name)
class InputBuffer(Buffer):
"""An InputBuffer that holds remote IUs."""
def __init__(self, owning_component_name, category_interests=None, participant_config=None):
'''Create an InputBuffer.
Keyword arguments:
owning_compontent_name -- name of the entity that owns this InputBuffer
category_interests -- list of IU categories this Buffer is interested in
participant_config = RSB configuration
'''
super(InputBuffer, self).__init__(owning_component_name, participant_config)
self._unique_name = '/ipaaca/component/'+str(owning_component_name)+'ID'+self._uuid+'/IB'
self._listener_store = {} # one per IU category
self._remote_server_store = {} # one per remote-IU-owning Component
self._category_interests = []
if category_interests is not None:
for cat in category_interests:
self._create_category_listener_if_needed(cat)
def _get_remote_server(self, iu):
'''Return (or create, store and return) a remote server.'''
if iu.owner_name in self._remote_server_store:
return self._remote_server_store[iu.owner_name]
# TODO remove the str() when unicode is supported (issue #490)
remote_server = rsb.createRemoteServer(rsb.Scope(str(iu.owner_name)))
self._remote_server_store[iu.owner_name] = remote_server
return remote_server
def _create_category_listener_if_needed(self, iu_category):
'''Return (or create, store and return) a category listener.'''
if iu_category in self._listener_store: return self._informer_store[iu_category]
cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
cat_listener.addHandler(self._handle_iu_events)
self._listener_store[iu_category] = cat_listener
self._category_interests.append(iu_category)
logger.info("Added category listener for "+iu_category)
return cat_listener
def _handle_iu_events(self, event):
'''Dispatch incoming IU events.
Adds incoming IU's to the store, applies payload and commit updates to
IU, calls IU event handlers.'
Keyword arguments:
event -- a converted RSB event
'''

Ramin Yaghoubzadeh
committed
type_ = type(event.data)
if type_ is RemotePushIU:
# a new IU
if event.data.uid in self._iu_store:
# already in our store
pass
else:
self._iu_store[ event.data.uid ] = event.data
event.data.buffer = self
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.ADDED, category=event.data.category)
else:
# an update to an existing IU
if event.data.writer_name == self.unique_name:
# Discard updates that originate from this buffer
return
if event.data.uid not in self._iu_store:
# TODO: we should request the IU's owner to send us the IU
logger.warning("Update message for IU which we did not fully receive before.")
return

Ramin Yaghoubzadeh
committed
if type_ is ipaaca_pb2.IUCommission:
# IU commit
iu = self._iu_store[event.data.uid]
iu._apply_commission()
iu._revision = event.data.revision
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category)

Ramin Yaghoubzadeh
committed
elif type_ is IUPayloadUpdate:
# IU payload update
iu = self._iu_store[event.data.uid]
iu._apply_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category)

Ramin Yaghoubzadeh
committed
elif type_ is IULinkUpdate:
# IU link update
iu = self._iu_store[event.data.uid]
iu._apply_link_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.LINKSUPDATED, category=iu.category)
else:
logger.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_))
class OutputBuffer(Buffer):
"""An OutputBuffer that holds local IUs."""
def __init__(self, owning_component_name, participant_config=None):
'''Create an Output Buffer.
Keyword arguments:
owning_component_name -- name of the entity that own this buffer
participant_config -- RSB configuration
'''
super(OutputBuffer, self).__init__(owning_component_name, participant_config)
self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB'
self._server = rsb.createServer(rsb.Scope(self._unique_name))
self._server.addMethod('updateLinks', self._remote_update_links, IULinkUpdate, int)
self._server.addMethod('updatePayload', self._remote_update_payload, IUPayloadUpdate, int)
self._server.addMethod('commit', self._remote_commit, ipaaca_pb2.IUCommission, int)
self._informer_store = {}
self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-'
self.__iu_id_counter_lock = threading.Lock()
self.__iu_id_counter = 0

Ramin Yaghoubzadeh
committed
def _create_own_name_listener(self, iu_category):
# FIXME replace this
'''Create an own name listener.'''
#if iu_category in self._listener_store: return self._informer_store[iu_category]
#cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
#cat_listener.addHandler(self._handle_iu_events)
#self._listener_store[iu_category] = cat_listener
#self._category_interests.append(iu_category)
#logger.info("Added category listener for "+iu_category)
#return cat_listener
def _generate_iu_uid(self):
'''Generate a unique IU id of the form'''
with self.__iu_id_counter_lock:
self.__iu_id_counter += 1
number = self.__iu_id_counter
return self._id_prefix + str(number)
def _remote_update_links(self, update):
'''Apply a remotely requested update to one of the stored IU's links.'''
if update.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
return 0
iu = self._iu_store[update.uid]
with iu.revision_lock:
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
return 0
if update.is_delta:
iu.modify_links(add=update.new_links, remove=update.links_to_remove, writer_name=update.writer_name)
else:
iu.set_links(links=update.new_links, writer_name=update.writer_name)
self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.LINKSUPDATED, category=iu.category)
return iu.revision
def _remote_update_payload(self, update):
'''Apply a remotely requested update to one of the stored IU's payload.'''
if update.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
return 0
iu = self._iu_store[update.uid]

Ramin Yaghoubzadeh
committed
with iu.revision_lock:
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
return 0
if update.is_delta:
for k in update.keys_to_remove:
iu.payload.__delitem__(k, writer_name=update.writer_name)
for k,v in update.new_items.items():
iu.payload.__setitem__(k, v, writer_name=update.writer_name)
else:
iu._set_payload(update.new_items, writer_name=update.writer_name)
# _set_payload etc. have also incremented the revision number
self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category)
return iu.revision
def _remote_commit(self, iu_commission):
'''Apply a remotely requested commit to one of the stored IUs.'''
if iu_commission.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid))
return 0
iu = self._iu_store[iu_commission.uid]

Ramin Yaghoubzadeh
committed
with iu.revision_lock:
if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
return 0
if iu.committed: