-
Ramin Yaghoubzadeh authored
ATTENTION: ipaaca.py disabled revision checks at the moment using an OMIT_... var (look on the first page of the file)
Ramin Yaghoubzadeh authoredATTENTION: ipaaca.py disabled revision checks at the moment using an OMIT_... var (look on the first page of the file)
ipaaca.py 42.35 KiB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function, division
import logging
import sys
import threading
import uuid
import collections
import copy
import rsb
import rsb.converter
import ipaaca_pb2
OMIT_REVISION_CHECKS = True
# IDEAS
# We should think about relaying the update event (or at least the
# affected keys in the payload / links) to the event handlers!
# THOUGHTS
# Output buffers could generate UIDs for IUs on request, without
# publishing them at that time. Then UID could then be used
# for internal links etc. The IU may be published later through
# the same buffer that allocated the UID.
# WARNINGS
# category is now the FIRST argument for IU constructors
__all__ = [
'IUEventType',
'IUAccessMode',
'InputBuffer', 'OutputBuffer',
'IU',
'IUPublishedError', 'IUUpdateFailedError', 'IUCommittedError', 'IUReadOnlyError', 'IUNotFoundError',
'logger'
]
## --- Utilities -------------------------------------------------------------
def enum(*sequential, **named):
"""Create an enum type.
Based on suggestion of Alec Thomas on stackoverflow.com:
http://stackoverflow.com/questions/36932/
whats-the-best-way-to-implement-an-enum-in-python/1695250#1695250
"""
enums = dict(zip(sequential, range(len(sequential))), **named)
return type('Enum', (), enums)
def pack_typed_payload_item(protobuf_object, key, value):
protobuf_object.key = str(key)
protobuf_object.value = str(value)
protobuf_object.type = 'str' # TODO: more types
def unpack_typed_payload_item(protobuf_object):
# TODO: more types
return (protobuf_object.key, str(protobuf_object.value))
class IpaacaLoggingHandler(logging.Handler):
def __init__(self, level=logging.DEBUG):
logging.Handler.__init__(self, level)
def emit(self, record):
meta = '[ipaaca] (' + str(record.levelname) + ') '
msg = str(record.msg.format(record.args))
print(meta + msg)
## --- Global Definitions ----------------------------------------------------
IUEventType = enum(
ADDED = 'ADDED',
COMMITTED = 'COMMITTED',
DELETED = 'DELETED',
RETRACTED = 'RETRACTED',
UPDATED = 'UPDATED',
LINKSUPDATED = 'LINKSUPDATED'
)
IUAccessMode = enum(
"PUSH",
"REMOTE",
"MESSAGE"
)
## --- Errors and Exceptions -------------------------------------------------
class IUPublishedError(Exception):
"""Error publishing of an IU failed since it is already in the buffer."""
def __init__(self, iu):
super(IUPublishedError, self).__init__('IU ' + str(iu.uid) + ' is already present in the output buffer.')
class IUUpdateFailedError(Exception):
"""Error indicating that a remote IU update failed."""
def __init__(self, iu):
super(IUUpdateFailedError, self).__init__('Remote update failed for IU ' + str(iu.uid) + '.')
class IUCommittedError(Exception):
"""Error indicating that an IU is immutable because it has been committed to."""
def __init__(self, iu):
super(IUCommittedError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it has been committed to.')
class IUReadOnlyError(Exception):
"""Error indicating that an IU is immutable because it is 'read only'."""
def __init__(self, iu):
super(IUReadOnlyError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it is read-only.')
class IUNotFoundError(Exception):
"""Error indicating that an IU UID was unexpectedly not found in an internal store."""
def __init__(self, iu_uid):
super(IUNotFoundError, self).__init__('Lookup of IU ' + str(iu_uid) + ' failed.')
## --- Generation Architecture -----------------------------------------------
class Payload(dict):
def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False):
pl = {} if new_payload is None else new_payload
self.iu = iu
# NOTE omit_init_update_message is necessary to prevent checking for
# exceptions and sending updates in the case where we just receive
# a whole new payload from the remote side and overwrite it locally.
if (not omit_init_update_message) and (self.iu.buffer is not None):
self.iu._modify_payload(payload=self, is_delta=False, new_items=pl, keys_to_remove=[], writer_name=writer_name)
for k, v in pl.items():
dict.__setitem__(self, k, v)
def __setitem__(self, k, v, writer_name=None):
self.iu._modify_payload(payload=self, is_delta=True, new_items={k:v}, keys_to_remove=[], writer_name=writer_name)
result = dict.__setitem__(self, k, v)
def __delitem__(self, k, writer_name=None):
self.iu._modify_payload(payload=self, is_delta=True, new_items={}, keys_to_remove=[k], writer_name=writer_name)
result = dict.__delitem__(self, k)
def _remotely_enforced_setitem(self, k, v):
"""Sets an item when requested remotely."""
return dict.__setitem__(self, k, v)
def _remotely_enforced_delitem(self, k):
"""Deletes an item when requested remotely."""
return dict.__delitem__(self, k)
class IUInterface(object): #{{{
"""Base class of all specialised IU classes."""
def __init__(self, uid, access_mode=IUAccessMode.PUSH, read_only=False):
"""Creates an IU.
Keyword arguments:
uid -- unique ID of this IU
access_mode -- access mode of this IU
read_only -- flag indicating whether this IU is read_only or not
"""
self._uid = uid
self._revision = None
self._category = None
self._payload_type = None
self._owner_name = None
self._committed = False
self._access_mode = access_mode
self._read_only = read_only
self._buffer = None
# payload is not present here
self._links = collections.defaultdict(set)
def __str__(self):
s = str(self.__class__)+"{ "
s += "uid="+self._uid+" "
s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") "
s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " "
s += "payload={ "
for k,v in self.payload.items():
s += k+":'"+v+"', "
s += "} "
s += "links={ "
for t,ids in self.get_all_links().items():
s += t+":'"+str(ids)+"', "
s += "} "
s += "}"
return s
def _add_and_remove_links(self, add, remove):
'''Just add and remove the new links in our links set, do not send an update here'''
'''Note: Also used for remotely enforced links updates.'''
for type in remove.keys(): self._links[type] -= set(remove[type])
for type in add.keys(): self._links[type] |= set(add[type])
def _replace_links(self, links):
'''Just wipe and replace our links set, do not send an update here'''
'''Note: Also used for remotely enforced links updates.'''
self._links = collections.defaultdict(set)
for type in links.keys(): self._links[type] |= set(links[type])
def add_links(self, type, targets, writer_name=None):
'''Attempt to add links if the conditions are met
and send an update message. Then call the local setter.'''
if not hasattr(targets, '__iter__'): targets=[targets]
self._modify_links(links=self, is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name)
self._add_and_remove_links( add={type:targets}, remove={} )
def remove_links(self, type, targets, writer_name=None):
'''Attempt to remove links if the conditions are met
and send an update message. Then call the local setter.'''
if not hasattr(targets, '__iter__'): targets=[targets]
self._modify_links(links=self, is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name)
self._add_and_remove_links( add={}, remove={type:targets} )
def modify_links(self, add, remove, writer_name=None):
'''Attempt to modify links if the conditions are met
and send an update message. Then call the local setter.'''
self._modify_links(links=self, is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name)
self._add_and_remove_links( add=add, remove=remove )
def set_links(self, links, writer_name=None):
'''Attempt to set (replace) links if the conditions are met
and send an update message. Then call the local setter.'''
self._modify_links(links=self, is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name)
self._replace_links( links=links )
def get_links(self, type):
return set(self._links[type])
def get_all_links(self):
return copy.deepcopy(self._links)
def _get_revision(self):
return self._revision
revision = property(fget=_get_revision, doc='Revision number of the IU.')
def _get_category(self):
return self._category
category = property(fget=_get_category, doc='Category of the IU.')
def _get_payload_type(self):
return self._payload_type
payload_type = property(fget=_get_payload_type, doc='Type of the IU payload')
def _get_committed(self):
return self._committed
committed = property(
fget=_get_committed,
doc='Flag indicating whether this IU has been committed to.')
def _get_uid(self):
return self._uid
uid = property(fget=_get_uid, doc='Unique ID of the IU.')
def _get_access_mode(self):
return self._access_mode
access_mode = property(fget=_get_access_mode, doc='Access mode of the IU.')
def _get_read_only(self):
return self._read_only
read_only = property(
fget=_get_read_only,
doc='Flag indicating whether this IU is read only.')
def _get_buffer(self):
return self._buffer
def _set_buffer(self, buffer):
if self._buffer is not None:
raise Exception('The IU is already in a buffer, cannot move it.')
self._buffer = buffer
buffer = property(
fget=_get_buffer,
fset=_set_buffer,
doc='Buffer this IU is held in.')
def _get_owner_name(self):
return self._owner_name
def _set_owner_name(self, owner_name):
if self._owner_name is not None:
raise Exception('The IU already has an owner name, cannot change it.')
self._owner_name = owner_name
owner_name = property(
fget=_get_owner_name,
fset=_set_owner_name,
doc="The IU's owner's name.")
#}}}
class IU(IUInterface):#{{{
"""A local IU."""
def __init__(self, category='undef', access_mode=IUAccessMode.PUSH, read_only=False, _payload_type='MAP'):
super(IU, self).__init__(uid=None, access_mode=access_mode, read_only=read_only)
self._revision = 1
self._category = category
self._payload_type = _payload_type
self.revision_lock = threading.RLock()
self._payload = Payload(iu=self)
def _modify_links(self, links, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
if self.committed:
raise IUCommittedError(self)
with self.revision_lock:
# modify links locally
self._increase_revision_number()
if self.is_published:
# send update to remote holders
self.buffer._send_iu_link_update(
self,
revision=self.revision,
is_delta=is_delta,
new_links=new_links,
links_to_remove=links_to_remove,
writer_name=self.owner_name if writer_name is None else writer_name)
def _modify_payload(self, payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
"""Modify the payload: add or remove items from this payload locally and send update."""
if self.committed:
raise IUCommittedError(self)
with self.revision_lock:
# set item locally
# FIXME: Is it actually set locally?
self._increase_revision_number()
if self.is_published:
# send update to remote holders
self.buffer._send_iu_payload_update(
self,
revision=self.revision,
is_delta=is_delta,
new_items=new_items,
keys_to_remove=keys_to_remove,
writer_name=self.owner_name if writer_name is None else writer_name)
def _increase_revision_number(self):
self._revision += 1
def _internal_commit(self, writer_name=None):
if self.committed:
raise IUCommittedError(self)
with self.revision_lock:
if not self._committed:
self._increase_revision_number()
self._committed = True
if self.buffer is not None:
self.buffer._send_iu_commission(self, writer_name=writer_name)
def commit(self):
"""Commit to this IU."""
return self._internal_commit()
def _get_payload(self):
return self._payload
def _set_payload(self, new_pl, writer_name=None):
if self.committed:
raise IUCommittedError(self)
with self.revision_lock:
self._increase_revision_number()
self._payload = Payload(
iu=self,
writer_name=None if self.buffer is None else (self.buffer.unique_name if writer_name is None else writer_name),
new_payload=new_pl)
payload = property(
fget=_get_payload,
fset=_set_payload,
doc='Payload dictionary of this IU.')
def _get_is_published(self):
return self.buffer is not None
is_published = property(
fget=_get_is_published,
doc='Flag indicating whether this IU has been published or not.')
def _set_buffer(self, buffer):
if self._buffer is not None:
raise Exception('The IU is already in a buffer, cannot move it.')
self._buffer = buffer
self.owner_name = buffer.unique_name
self._payload.owner_name = buffer.unique_name
buffer = property(
fget=IUInterface._get_buffer,
fset=_set_buffer,
doc='Buffer this IU is held in.')
def _set_uid(self, uid):
if self._uid is not None:
raise AttributeError('The uid of IU ' + self.uid + ' has already been set, cannot change it.')
self._uid = uid
uid = property(
fget=IUInterface._get_uid,
fset=_set_uid,
doc='Unique ID of theIU.')
#}}}
class RemotePushIU(IUInterface):#{{{
"""A remote IU with access mode 'PUSH'."""
def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links):
super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only)
self._revision = revision
self._category = category
self.owner_name = owner_name
self._payload_type = payload_type
self._committed = committed
# NOTE Since the payload is an already-existant Payload which we didn't modify ourselves,
# don't try to invoke any modification checks or network updates ourselves either.
# We are just receiving it here and applying the new data.
self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True)
self._links = links
def _modify_links(self, links, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
"""Modify the links: add or remove item from this payload remotely and send update."""
if self.committed:
raise IUCommittedError(self)
if self.read_only:
raise IUReadOnlyError(self)
requested_update = IULinkUpdate(
uid=self.uid,
revision=self.revision,
is_delta=is_delta,
writer_name=self.buffer.unique_name,
new_links=new_links,
links_to_remove=links_to_remove)
remote_server = self.buffer._get_remote_server(self)
new_revision = remote_server.updateLinks(requested_update)
if new_revision == 0:
raise IUUpdateFailedError(self)
else:
self._revision = new_revision
def _modify_payload(self, payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
"""Modify the payload: add or remove item from this payload remotely and send update."""
if self.committed:
raise IUCommittedError(self)
if self.read_only:
raise IUReadOnlyError(self)
requested_update = IUPayloadUpdate(
uid=self.uid,
revision=self.revision,
is_delta=is_delta,
writer_name=self.buffer.unique_name,
new_items=new_items,
keys_to_remove=keys_to_remove)
remote_server = self.buffer._get_remote_server(self)
new_revision = remote_server.updatePayload(requested_update)
if new_revision == 0:
raise IUUpdateFailedError(self)
else:
self._revision = new_revision
def commit(self):
"""Commit to this IU."""
if self.read_only:
raise IUReadOnlyError(self)
if self._committed:
# ignore commit requests when already committed
return
else:
commission_request = ipaaca_pb2.IUCommission()
commission_request.uid = self.uid
commission_request.revision = self.revision
commission_request.writer_name = self.buffer.unique_name
remote_server = self.buffer._get_remote_server(self)
new_revision = remote_server.commit(commission_request)
if new_revision == 0:
raise IUUpdateFailedError(self)
else:
self._revision = new_revision
self._committed = True
def _get_payload(self):
return self._payload
def _set_payload(self, new_pl):
if self.committed:
raise IUCommittedError(self)
if self.read_only:
raise IUReadOnlyError(self)
requested_update = IUPayloadUpdate(
uid=self.uid,
revision=self.revision,
is_delta=False,
writer_name=self.buffer.unique_name,
new_items=new_pl,
keys_to_remove=[])
remote_server = self.buffer._get_remote_server(self)
new_revision = remote_server.updatePayload(requested_update)
if new_revision == 0:
raise IUUpdateFailedError(self)
else:
self._revision = new_revision
# NOTE Please read the comment in the constructor
self._payload = Payload(iu=self, new_payload=new_pl, omit_init_update_message=True)
payload = property(
fget=_get_payload,
fset=_set_payload,
doc='Payload dictionary of the IU.')
def _apply_link_update(self, update):
"""Apply a IULinkUpdate to the IU."""
self._revision = update.revision
if update.is_delta:
self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove)
else:
self._replace_links(links=update.new_links)
def _apply_update(self, update):
"""Apply a IUPayloadUpdate to the IU."""
self._revision = update.revision
if update.is_delta:
for k in update.keys_to_remove: self.payload._remotely_enforced_delitem(k)
for k, v in update.new_items.items(): self.payload._remotely_enforced_setitem(k, v)
else:
# NOTE Please read the comment in the constructor
self._payload = Payload(iu=self, new_payload=update.new_items, omit_init_update_message=True)
def _apply_commission(self):
"""Apply commission to the IU"""
self._committed = True
#}}}
class IntConverter(rsb.converter.Converter):#{{{
"""Convert Python int objects to Protobuf ints and vice versa."""
def __init__(self, wireSchema="int", dataType=int):
super(IntConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, value):
pbo = ipaaca_pb2.IntMessage()
pbo.value = value
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca_pb2.IntMessage()
pbo.ParseFromString( str(byte_stream) )
return pbo.value
#}}}
class IUConverter(rsb.converter.Converter):#{{{
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-iu", dataType=IU):
super(IUConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu):
pbo = ipaaca_pb2.IU()
pbo.uid = iu._uid
pbo.revision = iu._revision
pbo.category = iu._category
pbo.payload_type = iu._payload_type
pbo.owner_name = iu._owner_name
pbo.committed = iu._committed
pbo.access_mode = ipaaca_pb2.IU.PUSH # TODO
pbo.read_only = iu._read_only
for k,v in iu._payload.items():
entry = pbo.payload.add()
pack_typed_payload_item(entry, k, v)
for type_ in iu._links.keys():
linkset = pbo.links.add()
linkset.type = type_
linkset.targets.extend(iu._links[type_])
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
type = self.getDataType()
if type == IU:
pbo = ipaaca_pb2.IU()
pbo.ParseFromString( str(byte_stream) )
if pbo.access_mode == ipaaca_pb2.IU.PUSH:
_payload = {}
for entry in pbo.payload:
k, v = unpack_typed_payload_item(entry)
_payload[k] = v
_links = collections.defaultdict(set)
for linkset in pbo.links:
for target_uid in linkset.targets:
_links[linkset.type].add(target_uid)
remote_push_iu = RemotePushIU(
uid=pbo.uid,
revision=pbo.revision,
read_only = pbo.read_only,
owner_name = pbo.owner_name,
category = pbo.category,
payload_type = pbo.payload_type,
committed = pbo.committed,
payload=_payload,
links=_links
)
return remote_push_iu
else:
raise Exception("We can only handle IUs with access mode 'PUSH' for now!")
else:
raise ValueError("Inacceptable dataType %s" % type)
#}}}
class IULinkUpdate(object):#{{{
def __init__(self, uid, revision, is_delta, writer_name="undef", new_links=None, links_to_remove=None):
super(IULinkUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.writer_name = writer_name
self.is_delta = is_delta
self.new_links = collections.defaultdict(set) if new_links is None else collections.defaultdict(set, new_links)
self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove)
def __str__(self):
s = 'LinkUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_links = '+str(self.new_links)+', '
s += 'links_to_remove = '+str(self.links_to_remove)+')'
return s
#}}}
class IUPayloadUpdate(object):#{{{
def __init__(self, uid, revision, is_delta, writer_name="undef", new_items=None, keys_to_remove=None):
super(IUPayloadUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.writer_name = writer_name
self.is_delta = is_delta
self.new_items = {} if new_items is None else new_items
self.keys_to_remove = [] if keys_to_remove is None else keys_to_remove
def __str__(self):
s = 'PayloadUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_items = '+str(self.new_items)+', '
s += 'keys_to_remove = '+str(self.keys_to_remove)+')'
return s
#}}}
class IULinkUpdateConverter(rsb.converter.Converter):#{{{
def __init__(self, wireSchema="ipaaca-iu-link-update", dataType=IULinkUpdate):
super(IULinkUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu_link_update):
pbo = ipaaca_pb2.IULinkUpdate()
pbo.uid = iu_link_update.uid
pbo.writer_name = iu_link_update.writer_name
pbo.revision = iu_link_update.revision
for type_ in iu_link_update.new_links.keys():
linkset = pbo.new_links.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.new_links[type_])
for type_ in iu_link_update.links_to_remove.keys():
linkset = pbo.links_to_remove.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.links_to_remove[type_])
pbo.is_delta = iu_link_update.is_delta
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
type = self.getDataType()
if type == IULinkUpdate:
pbo = ipaaca_pb2.IULinkUpdate()
pbo.ParseFromString( str(byte_stream) )
logger.debug('received an IULinkUpdate for revision '+str(pbo.revision))
iu_link_up = IULinkUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_links:
iu_link_up.new_links[str(entry.type)] = set(entry.targets)
for entry in pbo.links_to_remove:
iu_link_up.links_to_remove[str(entry.type)] = set(entry.targets)
return iu_link_up
else:
raise ValueError("Inacceptable dataType %s" % type)
#}}}
class IUPayloadUpdateConverter(rsb.converter.Converter):#{{{
def __init__(self, wireSchema="ipaaca-iu-payload-update", dataType=IUPayloadUpdate):
super(IUPayloadUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu_payload_update):
pbo = ipaaca_pb2.IUPayloadUpdate()
pbo.uid = iu_payload_update.uid
pbo.writer_name = iu_payload_update.writer_name
pbo.revision = iu_payload_update.revision
for k,v in iu_payload_update.new_items.items():
entry = pbo.new_items.add()
pack_typed_payload_item(entry, k, v)
pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove)
pbo.is_delta = iu_payload_update.is_delta
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
type = self.getDataType()
if type == IUPayloadUpdate:
pbo = ipaaca_pb2.IUPayloadUpdate()
pbo.ParseFromString( str(byte_stream) )
logger.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_items:
k, v = unpack_typed_payload_item(entry)
iu_up.new_items[k] = v
iu_up.keys_to_remove = pbo.keys_to_remove[:]
return iu_up
else:
raise ValueError("Inacceptable dataType %s" % type)
#}}}
class IUStore(dict):
"""A dictionary storing IUs."""
def __init__(self):
super(IUStore, self).__init__()
class FrozenIUStore(IUStore):
"""A read-only version of a dictionary storing IUs. (TODO: might be slow)"""
def __init__(self, original_iu_store):
super(FrozenIUStore, self).__init__()
map(lambda p: super(FrozenIUStore, self).__setitem__(p[0], p[1]), original_iu_store.items())
def __delitem__(self, k):
raise AttributeError()
def __setitem__(self, k, v):
raise AttributeError()
class IUEventHandler(object):
"""Wrapper for IU event handling functions."""
def __init__(self, handler_function, for_event_types=None, for_categories=None):
"""Create an IUEventHandler.
Keyword arguments:
handler_function -- the handler function with the signature
(IU, event_type, local)
for_event_types -- a list of event types or None if handler should
be called for all event types
for_categories -- a list of category names or None if handler should
be called for all categoires
"""
super(IUEventHandler, self).__init__()
self._handler_function = handler_function
self._for_event_types = (
None if for_event_types is None else
(for_event_types[:] if hasattr(for_event_types, '__iter__') else [for_event_types]))
self._for_categories = (
None if for_categories is None else
(for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories]))
def condition_met(self, event_type, category):
"""Check whether this IUEventHandler should be called.
Keyword arguments:
event_type -- type of the IU event
category -- category of the IU which triggered the event
"""
type_condition_met = (self._for_event_types is None or event_type in self._for_event_types)
cat_condition_met = (self._for_categories is None or category in self._for_categories)
return type_condition_met and cat_condition_met
def call(self, buffer, iu_uid, local, event_type, category):
"""Call this IUEventHandler's function, if it applies.
Keyword arguments:
buffer -- the buffer in which the IU is stored
iu_uid -- the uid of the IU
local -- is the IU local or remote to this component? @RAMIN: Is this correct?
event_type -- IU event type
category -- category of the IU
"""
if self.condition_met(event_type, category):
iu = buffer._iu_store[iu_uid]
self._handler_function(iu, event_type, local)
class Buffer(object):
"""Base class for InputBuffer and OutputBuffer."""
def __init__(self, owning_component_name, participant_config=None):
'''Create a Buffer.
Keyword arguments:
owning_compontent_name -- name of the entity that owns this Buffer
participant_config -- RSB configuration
'''
super(Buffer, self).__init__()
self._owning_component_name = owning_component_name
self._participant_config = participant_config #rsb.ParticipantConfig.fromDefaultSources() if participant_config is None else participant_config
self._uuid = str(uuid.uuid4())[0:8]
# Initialise with a temporary, but already unique, name
self._unique_name = "undef-"+self._uuid
self._iu_store = IUStore()
self._iu_event_handlers = []
def _get_frozen_iu_store(self):
return FrozenIUStore(original_iu_store = self._iu_store)
iu_store = property(fget=_get_frozen_iu_store, doc='Copy-on-read version of the internal IU store')
def register_handler(self, handler_function, for_event_types=None, for_categories=None):
"""Register a new IU event handler function.
Keyword arguments:
handler_function -- a function with the signature (IU, event_type, local)
for_event_types -- a list of event types or None if handler should
be called for all event types
for_categories -- a list of category names or None if handler should
be called for all categoires
"""
handler = IUEventHandler(handler_function=handler_function, for_event_types=for_event_types, for_categories=for_categories)
self._iu_event_handlers.append(handler)
def call_iu_event_handlers(self, uid, local, event_type, category):
"""Call registered IU event handler functions registered for this event_type and category."""
for h in self._iu_event_handlers:
h.call(self, uid, local=local, event_type=event_type, category=category)
def _get_owning_component_name(self):
"""Return the name of this Buffer's owning component"""
return self._owning_component_name
owning_component_name = property(_get_owning_component_name)
def _get_unique_name(self):
"""Return the Buffer's unique name."""
return self._unique_name
unique_name = property(_get_unique_name)
class InputBuffer(Buffer):
"""An InputBuffer that holds remote IUs."""
def __init__(self, owning_component_name, category_interests=None, participant_config=None):
'''Create an InputBuffer.
Keyword arguments:
owning_compontent_name -- name of the entity that owns this InputBuffer
category_interests -- list of IU categories this Buffer is interested in
participant_config = RSB configuration
'''
super(InputBuffer, self).__init__(owning_component_name, participant_config)
self._unique_name = '/ipaaca/component/'+str(owning_component_name)+'ID'+self._uuid+'/IB'
self._listener_store = {} # one per IU category
self._remote_server_store = {} # one per remote-IU-owning Component
self._category_interests = []
if category_interests is not None:
for cat in category_interests:
self._create_category_listener_if_needed(cat)
def _get_remote_server(self, iu):
'''Return (or create, store and return) a remote server.'''
if iu.owner_name in self._remote_server_store:
return self._remote_server_store[iu.owner_name]
# TODO remove the str() when unicode is supported (issue #490)
remote_server = rsb.createRemoteServer(rsb.Scope(str(iu.owner_name)))
self._remote_server_store[iu.owner_name] = remote_server
return remote_server
def _create_category_listener_if_needed(self, iu_category):
'''Return (or create, store and return) a category listener.'''
if iu_category in self._listener_store: return self._informer_store[iu_category]
cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
cat_listener.addHandler(self._handle_iu_events)
self._listener_store[iu_category] = cat_listener
self._category_interests.append(iu_category)
logger.info("Added listener in scope "+"/ipaaca/category/"+iu_category)
return cat_listener
def _handle_iu_events(self, event):
'''Dispatch incoming IU events.
Adds incoming IU's to the store, applies payload and commit updates to
IU, calls IU event handlers.'
Keyword arguments:
event -- a converted RSB event
'''
type_ = type(event.data)
if type_ is RemotePushIU:
# a new IU
if event.data.uid in self._iu_store:
# already in our store
pass
else:
self._iu_store[ event.data.uid ] = event.data
event.data.buffer = self
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.ADDED, category=event.data.category)
else:
# an update to an existing IU
if event.data.writer_name == self.unique_name:
# Discard updates that originate from this buffer
return
if event.data.uid not in self._iu_store:
# TODO: we should request the IU's owner to send us the IU
logger.warning("Update message for IU which we did not fully receive before.")
return
if type_ is ipaaca_pb2.IUCommission:
# IU commit
iu = self._iu_store[event.data.uid]
iu._apply_commission()
iu._revision = event.data.revision
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category)
elif type_ is IUPayloadUpdate:
# IU payload update
iu = self._iu_store[event.data.uid]
iu._apply_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category)
elif type_ is IULinkUpdate:
# IU link update
iu = self._iu_store[event.data.uid]
iu._apply_link_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.LINKSUPDATED, category=iu.category)
else:
logger.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_))
class OutputBuffer(Buffer):
"""An OutputBuffer that holds local IUs."""
def __init__(self, owning_component_name, participant_config=None):
'''Create an Output Buffer.
Keyword arguments:
owning_component_name -- name of the entity that own this buffer
participant_config -- RSB configuration
'''
super(OutputBuffer, self).__init__(owning_component_name, participant_config)
self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB'
self._server = rsb.createServer(rsb.Scope(self._unique_name))
self._server.addMethod('updateLinks', self._remote_update_links, IULinkUpdate, int)
self._server.addMethod('updatePayload', self._remote_update_payload, IUPayloadUpdate, int)
self._server.addMethod('commit', self._remote_commit, ipaaca_pb2.IUCommission, int)
self._informer_store = {}
self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-'
self.__iu_id_counter_lock = threading.Lock()
self.__iu_id_counter = 0
def _create_own_name_listener(self, iu_category):
# FIXME replace this
'''Create an own name listener.'''
#if iu_category in self._listener_store: return self._informer_store[iu_category]
#cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
#cat_listener.addHandler(self._handle_iu_events)
#self._listener_store[iu_category] = cat_listener
#self._category_interests.append(iu_category)
#logger.info("Added category listener for "+iu_category)
#return cat_listener
def _generate_iu_uid(self):
'''Generate a unique IU id of the form'''
with self.__iu_id_counter_lock:
self.__iu_id_counter += 1
number = self.__iu_id_counter
return self._id_prefix + str(number)
def _remote_update_links(self, update):
'''Apply a remotely requested update to one of the stored IU's links.'''
if update.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
return 0
iu = self._iu_store[update.uid]
with iu.revision_lock:
if not OMIT_REVISION_CHECKS and (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
return 0
if update.is_delta:
iu.modify_links(add=update.new_links, remove=update.links_to_remove, writer_name=update.writer_name)
else:
iu.set_links(links=update.new_links, writer_name=update.writer_name)
self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.LINKSUPDATED, category=iu.category)
return iu.revision
def _remote_update_payload(self, update):
'''Apply a remotely requested update to one of the stored IU's payload.'''
if update.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
return 0
iu = self._iu_store[update.uid]
with iu.revision_lock:
if not OMIT_REVISION_CHECKS and (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
return 0
if update.is_delta:
for k in update.keys_to_remove:
iu.payload.__delitem__(k, writer_name=update.writer_name)
for k,v in update.new_items.items():
iu.payload.__setitem__(k, v, writer_name=update.writer_name)
else:
iu._set_payload(update.new_items, writer_name=update.writer_name)
# _set_payload etc. have also incremented the revision number
self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category)
return iu.revision
def _remote_commit(self, iu_commission):
'''Apply a remotely requested commit to one of the stored IUs.'''
if iu_commission.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid))
return 0
iu = self._iu_store[iu_commission.uid]
with iu.revision_lock:
if not OMIT_REVISION_CHECKS and (iu_commission.revision != 0) and (iu_commission.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
return 0
if iu.committed:
return 0
else:
iu._internal_commit(writer_name=iu_commission.writer_name)
self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=IUEventType.COMMITTED, category=iu.category)
return iu.revision
def _get_informer(self, iu_category):
'''Return (or create, store and return) an informer object for IUs of the specified category.'''
if iu_category in self._informer_store:
logger.info("Returning informer on scope "+"/ipaaca/category/"+str(iu_category))
return self._informer_store[iu_category]
informer_iu = rsb.createInformer(
rsb.Scope("/ipaaca/category/"+str(iu_category)),
config=self._participant_config,
dataType=object)
self._informer_store[iu_category] = informer_iu #new_tuple
logger.info("Returning NEW informer on scope "+"/ipaaca/category/"+str(iu_category))
return informer_iu #return new_tuple
def add(self, iu):
'''Add an IU to the IU store, assign an ID and publish it.'''
if iu._uid is not None:
raise IUPublishedError(iu)
iu.uid = self._generate_iu_uid()
self._iu_store[iu._uid] = iu
iu.buffer = self
self._publish_iu(iu)
def remove(self, iu=None, iu_uid=None):
'''Remove the iu or an IU corresponding to iu_uid from the OutputBuffer, retracting it from the system.'''
if iu is None:
if iu_uid is None:
return None
else:
if iu_uid not in self. _iu_store:
raise IUNotFoundError(iu_uid)
iu = self._iu_store[iu_uid]
# unpublish the IU
self._retract_iu(iu)
del self._iu_store[iu.uid]
return iu
def _publish_iu(self, iu):
'''Publish an IU.'''
informer = self._get_informer(iu._category)
informer.publishData(iu)
def _retract_iu(self, iu):
'''Retract (unpublish) an IU.'''
iu_retraction = ipaaca_pb2.IURetraction()
iu_retraction.uid = iu.uid
iu_retraction.revision = iu.revision
informer = self._get_informer(iu._category)
informer.publishData(iu_retraction)
def _send_iu_commission(self, iu, writer_name):
'''Send IU commission.
Keyword arguments:
iu -- the IU that has been committed to
writer_name -- name of the Buffer that initiated this commit, necessary
to enable remote components to filter out updates that originated
from their own operations
'''
# a raw Protobuf object for IUCommission is produced
# (unlike updates, where we have an intermediate class)
iu_commission = ipaaca_pb2.IUCommission()
iu_commission.uid = iu.uid
iu_commission.revision = iu.revision
iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name
informer = self._get_informer(iu._category)
informer.publishData(iu_commission)
def _send_iu_link_update(self, iu, is_delta, revision, new_links=None, links_to_remove=None, writer_name="undef"):
'''Send an IU link update.
Keyword arguments:
iu -- the IU being updated
is_delta -- whether this is an incremental update or a replacement
the whole link dictionary
revision -- the new revision number
new_links -- a dictionary of new link sets
links_to_remove -- a dict of the link sets that shall be removed
writer_name -- name of the Buffer that initiated this update, necessary
to enable remote components to filter out updates that originate d
from their own operations
'''
if new_links is None:
new_links = {}
if links_to_remove is None:
links_to_remove = {}
link_update = IULinkUpdate(iu._uid, is_delta=is_delta, revision=revision)
link_update.new_links = new_links
if is_delta:
link_update.links_to_remove = links_to_remove
link_update.writer_name = writer_name
informer = self._get_informer(iu._category)
informer.publishData(link_update)
# FIXME send the notification to the target, if the target is not the writer_name
def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"):
'''Send an IU payload update.
Keyword arguments:
iu -- the IU being updated
is_delta -- whether this is an incremental update or a replacement
revision -- the new revision number
new_items -- a dictionary of new payload items
keys_to_remove -- a list of the keys that shall be removed from the
payload
writer_name -- name of the Buffer that initiated this update, necessary
to enable remote components to filter out updates that originate d
from their own operations
'''
if new_items is None:
new_items = {}
if keys_to_remove is None:
keys_to_remove = []
payload_update = IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision)
payload_update.new_items = new_items
if is_delta:
payload_update.keys_to_remove = keys_to_remove
payload_update.writer_name = writer_name
informer = self._get_informer(iu._category)
informer.publishData(payload_update)
## --- RSB -------------------------------------------------------------------
def initialize_ipaaca_rsb():#{{{
rsb.converter.registerGlobalConverter(
IntConverter(wireSchema="int32", dataType=int))
rsb.converter.registerGlobalConverter(
IUConverter(wireSchema="ipaaca-iu", dataType=IU))
rsb.converter.registerGlobalConverter(
IULinkUpdateConverter(
wireSchema="ipaaca-iu-link-update",
dataType=IULinkUpdate))
rsb.converter.registerGlobalConverter(
IUPayloadUpdateConverter(
wireSchema="ipaaca-iu-payload-update",
dataType=IUPayloadUpdate))
rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca_pb2.IUCommission))
#rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()
#t = rsb.ParticipantConfig.Transport('spread', {'enabled':'true'})
rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromFile('rsb.cfg')
#}}}
## --- Module initialisation -------------------------------------------------
# register our own RSB Converters
initialize_ipaaca_rsb()
# Create a global logger for this module
logger = logging.getLogger('ipaaca')
logger.addHandler(IpaacaLoggingHandler(level=logging.INFO))