diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..3cd962fdf860bfd8ff34792a5a6d2d9509cce483 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +**/.*.swp + diff --git a/proto/ipaaca.proto b/proto/ipaaca.proto old mode 100644 new mode 100755 index c53e7a199ea7c89ce5ab2df8f5274d7aa1b0ef05..09129d46dd61ad2f697bb1da452837193030e827 --- a/proto/ipaaca.proto +++ b/proto/ipaaca.proto @@ -21,7 +21,7 @@ message IU { required string uid = 1; required uint32 revision = 2; required string category = 3 [default = "undef"]; - optional string type = 4; + required string payload_type = 4 [default = "MAP"]; required string owner_name = 5; required bool committed = 6 [default = false]; required AccessMode access_mode = 7 [default = PUSH]; diff --git a/python/src/ipaaca.py b/python/src/ipaaca.py index e033f3ad3074a88b9940e7e7deb5ffce2777cac8..95e0d1b65f4de19fa7f32d794f66eb6edec1025c 100755 --- a/python/src/ipaaca.py +++ b/python/src/ipaaca.py @@ -41,7 +41,7 @@ def enum(*sequential, **named): def pack_typed_payload_item(protobuf_object, key, value): protobuf_object.key = str(key) protobuf_object.value = str(value) - protobuf_object.type = 'str' # TODO: more types + protobuf_object.payload_type = 'str' # TODO: more types def unpack_typed_payload_item(protobuf_object): @@ -109,159 +109,21 @@ class IUReadOnlyError(Exception): ## --- Generation Architecture ----------------------------------------------- -class LocalPayload(dict):#{{{ - - """Payload of IUs held locally, i.e., in an OutputBuffer.""" - +class Payload(dict): def __init__(self, iu, writer_name=None, new_payload=None): - """Create local payload object. - - Keyword arguments: - iu -- IU holding this payload - writer_name -- @RAMIN: What is the reason to specify it here? - new_payload -- a payload dictionary to initialise this object - with, or None for an empty payload - """ - super(LocalPayload, self).__init__() - self._iu = iu - if new_payload is not None: - for k, v in new_payload.items(): - dict.__setitem__(self, k, v) - if self._iu.is_published: - self._iu.buffer._send_iu_payload_update( - self._iu, - revision=iu.revision, - is_delta=False, - new_items=new_payload, - keys_to_remove=[], - writer_name = self._iu.owner_name if writer_name is None else writer_name) - - # @RAMIN: Not needed, right? - #def __contains__(self, k): - # return dict.__contains__(self, k) - - #def __getitem__(self, k): - # return dict.__getitem__(self, k) - + pl = {} if new_payload is None else new_payload + self.iu = iu + self.iu._set_payload(payload=self, is_delta=False, new_items=pl, keys_to_remove=[], writer_name=writer_name) + for k, v in pl.items(): + dict.__setitem__(self, k, v) def __setitem__(self, k, v, writer_name=None): - """Set an item from this payload locally and send update.""" - if self._iu.committed: - raise IUCommittedError(self._iu) - with self._iu.revision_lock: - # set item locally - result = dict.__setitem__(self, k, v) - self._iu._increase_revision_number() - if self._iu.is_published: - # send update to remote holders - self._iu.buffer._send_iu_payload_update( - self._iu, - revision=self._iu.revision, - is_delta=True, - new_items={k:v}, - keys_to_remove=[], - writer_name=self._iu.owner_name if writer_name is None else writer_name) - return result - + self.iu._set_payload(payload=self, is_delta=True, new_items={k:v}, keys_to_remove=[], writer_name=writer_name) + result = dict.__setitem__(self, k, v) def __delitem__(self, k, writer_name=None): - """Delete an item from this payload locally and send update.""" - if self._iu.committed: - raise IUCommittedError(self._iu) - with self._iu.revision_lock: - # delete item locally - result = dict.__delitem__(self, k) - self._iu._increase_revision_number() - if self._iu.is_published: - # send update to remote holders - self._iu.buffer._send_iu_payload_update( - self._iu, - revision=self._iu.revision, - is_delta=True, - new_items={}, - keys_to_remove=[k], - writer_name=self._iu.owner_name if writer_name is None else writer_name) - return result -#}}} + self.iu._set_payload(payload=self, is_delta=True, new_items={}, keys_to_remove=[k], writer_name=writer_name) + result = dict.__delitem__(self, k) -class RemotePushPayload(dict):#{{{ - - """Payload of an IU of type 'PUSH' hold remotely, i.e., in an InputBuffer.""" - - def __init__(self, remote_push_iu, new_payload): - """Create remote payload object. - - Keyword arguments: - remote_push_iu -- remote IU holding this payload - new_payload -- payload dict to initialise this remote payload with - """ - super(RemotePushPayload, self).__init__() - self._remote_push_iu = remote_push_iu - if new_payload is not None: - for k,v in new_payload.items(): - dict.__setitem__(self, k, v) - - def __setitem__(self, k, v): - """Set item in this payload. - - Requests item setting from the OutputBuffer holding the local version - of this IU. Returns when permission is granted and item is set; - otherwise raises an IUUpdateFailedError. - """ - if self._remote_push_iu.committed: - raise IUCommittedError(self._remote_push_iu) - if self._remote_push_iu.read_only: - raise IUReadOnlyError(self._remote_push_iu) - requested_update = IUPayloadUpdate( - uid=self._remote_push_iu.uid, - revision=self._remote_push_iu.revision, - is_delta=True, - writer_name=self._remote_push_iu.buffer.unique_name, - new_items={k:v}, - keys_to_remove=[]) - remote_server = self._remote_push_iu.buffer._get_remote_server(self._remote_push_iu) - new_revision = remote_server.updatePayload(requested_update) - if new_revision == 0: - raise IUUpdateFailedError(self._remote_push_iu) - else: - self._remote_push_iu._revision = new_revision - dict.__setitem__(self, k, v) - - def __delitem__(self, k): - """Delete item in this payload. - - Requests item deletion from the OutputBuffer holding the local version - of this IU. Returns when permission is granted and item is deleted; - otherwise raises an IUUpdateFailedError. - """ - if self._remote_push_iu.committed: - raise IUCommittedError(self._remote_push_iu) - if self._remote_push_iu.read_only: - raise IUReadOnlyError(self._remote_push_iu) - requested_update = IUPayloadUpdate( - uid=self._remote_push_iu.uid, - revision=self._remote_push_iu.revision, - is_delta=True, - writer_name=self._remote_push_iu.buffer.unique_name, - new_items={}, - keys_to_remove=[k]) - remote_server = self._remote_push_iu.buffer._get_remote_server(self._remote_push_iu) - new_revision = remote_server.updatePayload(requested_update) - if new_revision == 0: - raise IUUpdateFailedError(self._remote_push_iu) - else: - self._remote_push_iu._revision = new_revision - dict.__delitem__(self, k) - - def _remotely_enforced_setitem(self, k, v): - """Sets an item when requested remotely.""" - return dict.__setitem__(self, k, v) - - def _remotely_enforced_delitem(self, k): - """Deletes an item when requested remotely.""" - return dict.__delitem__(self, k) - -#}}} - class IUInterface(object): """Base class of all specialised IU classes.""" @@ -277,7 +139,7 @@ class IUInterface(object): self._uid = uid self._revision = None self._category = None - self._type = None + self._payload_type = None self._owner_name = None self._committed = False self._access_mode = access_mode @@ -293,9 +155,9 @@ class IUInterface(object): return self._category category = property(fget=_get_category, doc='Category of the IU.') - def _get_type(self): - return self._type - type = property(fget=_get_type, doc='Type of the IU') + def _get_payload_type(self): + return self._payload_type + payload_type = property(fget=_get_payload_type, doc='Type of the IU payload') def _get_committed(self): return self._committed @@ -344,14 +206,32 @@ class IU(IUInterface):#{{{ """A local IU.""" - def __init__(self, access_mode=IUAccessMode.PUSH, read_only=False, category='undef', _type='undef'): + def __init__(self, access_mode=IUAccessMode.PUSH, read_only=False, category='undef', _payload_type='MAP'): super(IU, self).__init__(uid=None, access_mode=access_mode, read_only=read_only) self._revision = 1 self._category = category - self._type = _type - self._payload = LocalPayload(iu=self) + self._payload_type = _payload_type + self._payload = Payload(iu=self) self.revision_lock = threading.Lock() + def _set_payload(payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): + """Set an item from this payload locally and send update.""" + if self.committed: + raise IUCommittedError(self) + with self.revision_lock: + # set item locally + self._increase_revision_number() + if self.is_published: + # send update to remote holders + self.buffer._send_iu_payload_update( + self, + revision=self.revision, + is_delta=True, + new_items={k:v}, + keys_to_remove=[], + writer_name=self.owner_name if writer_name is None else writer_name) + return result + def _increase_revision_number(self): self._revision += 1 @@ -387,7 +267,7 @@ class IU(IUInterface):#{{{ raise IUCommittedError(self) with self.revision_lock: self._increase_revision_number() - self._payload = LocalPayload( + self._payload = Payload( iu=self, writer_name=None if self.buffer is None else (self.buffer.unique_name if writer_name is None else writer_name), new_payload=new_pl) @@ -428,15 +308,34 @@ class RemotePushIU(IUInterface):#{{{ """A remote IU with access mode 'PUSH'.""" - def __init__(self, uid, revision, read_only, owner_name, category, type, committed, payload): + def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload): super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only) self._revision = revision self._category = category self.owner_name = owner_name - self._type = type + self._payload_type = payload_type self._committed = committed - self._payload = RemotePushPayload(remote_push_iu=self, new_payload=payload) + self._payload = Payload(iu=self, new_payload=payload) + def _set_payload(payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): + if self.committed: + raise IUCommittedError(self) + if self.read_only: + raise IUReadOnlyError(self) + requested_update = IUPayloadUpdate( + uid=self.uid, + revision=self.revision, + is_delta=is_delta, + writer_name=self.buffer.unique_name, + new_items=new_items, + keys_to_remove=keys_to_remove) + remote_server = self.buffer._get_remote_server(self) + new_revision = remote_server.updatePayload(requested_update) + if new_revision == 0: + raise IUUpdateFailedError(self) + else: + self._revision = new_revision + def __str__(self): s = "RemotePushIU{ " s += "uid="+self._uid+" " @@ -489,7 +388,7 @@ class RemotePushIU(IUInterface):#{{{ raise IUUpdateFailedError(self) else: self._revision = new_revision - self._payload = RemotePushPayload(remote_push_iu=self, new_payload=new_pl) + self._payload = Payload(iu=self, new_payload=new_pl) payload = property( fget=_get_payload, fset=_set_payload, @@ -503,7 +402,7 @@ class RemotePushIU(IUInterface):#{{{ for k, v in update.new_items.items(): self.payload._remotely_enforced_setitem(k, v) else: # using '_payload' to circumvent the local writing methods - self._payload = RemotePushPayload(remote_push_iu=self, new_payload=update.new_items) + self._payload = Payload(iu=self, new_payload=update.new_items) def _apply_commission(self): """Apply commission to the IU""" @@ -541,7 +440,7 @@ class IUConverter(rsb.transport.converter.Converter):#{{{ pbo.uid = iu._uid pbo.revision = iu._revision pbo.category = iu._category - pbo.type = iu._type + pbo.payload_type = iu._payload_type pbo.owner_name = iu._owner_name pbo.committed = iu._committed pbo.access_mode = ipaaca_pb2.IU.PUSH # TODO @@ -567,7 +466,7 @@ class IUConverter(rsb.transport.converter.Converter):#{{{ read_only = pbo.read_only, owner_name = pbo.owner_name, category = pbo.category, - type = pbo.type, + payload_type = pbo.payload_type, committed = pbo.committed, payload=_payload )