From b2442f8b13811e18d6d276e64accbca672148820 Mon Sep 17 00:00:00 2001 From: Hendrik Buschmeier <hbuschme@uni-bielefeld.de> Date: Thu, 5 Feb 2015 16:59:59 +0100 Subject: [PATCH] ipaaca-python: It's now possible to set the payload type (JSON, STR) for an individual IU. --- ipaacalib/java/src/ipaaca/AbstractIU.java | 2 +- ipaacalib/java/src/ipaaca/IUConverter.java | 4 +-- ipaacalib/java/src/ipaaca/LocalIU.java | 4 +-- ipaacalib/java/src/ipaaca/Payload.java | 3 +- ipaacalib/java/src/ipaaca/RemotePushIU.java | 8 ++--- ipaacalib/python/src/ipaaca/buffer.py | 6 +++- ipaacalib/python/src/ipaaca/converter.py | 33 ++++++++++------- ipaacalib/python/src/ipaaca/defaults.py | 3 ++ ipaacalib/python/src/ipaaca/iu.py | 40 ++++++++++++++------- ipaacalib/python/src/ipaaca/misc.py | 16 +++++++-- 10 files changed, 80 insertions(+), 39 deletions(-) diff --git a/ipaacalib/java/src/ipaaca/AbstractIU.java b/ipaacalib/java/src/ipaaca/AbstractIU.java index e521b81..d79f162 100644 --- a/ipaacalib/java/src/ipaaca/AbstractIU.java +++ b/ipaacalib/java/src/ipaaca/AbstractIU.java @@ -231,7 +231,7 @@ public abstract class AbstractIU List<PayloadItem> items = new ArrayList<PayloadItem>(); for (Entry<String, String> entry : newPayload.entrySet()) { - PayloadItem item = PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("str") // TODO:default type? + PayloadItem item = PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("STR") .build(); items.add(item); } diff --git a/ipaacalib/java/src/ipaaca/IUConverter.java b/ipaacalib/java/src/ipaaca/IUConverter.java index a185992..d2138d6 100644 --- a/ipaacalib/java/src/ipaaca/IUConverter.java +++ b/ipaacalib/java/src/ipaaca/IUConverter.java @@ -79,7 +79,7 @@ public class IUConverter implements Converter<ByteBuffer> List<PayloadItem> payloadItems = new ArrayList<PayloadItem>(); for (Entry<String, String> entry : iua.getPayload().entrySet()) { - payloadItems.add(PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("str").build()); + payloadItems.add(PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("STR").build()); } List<LinkSet> links = new ArrayList<LinkSet>(); @@ -95,7 +95,7 @@ public class IUConverter implements Converter<ByteBuffer> } IU iu = IU.newBuilder().setUid(iua.getUid()).setRevision(iua.getRevision()).setCategory(iua.getCategory()) .setOwnerName(iua.getOwnerName()).setCommitted(iua.isCommitted()).setAccessMode(accessMode) - .setReadOnly(iua.isReadOnly()).setPayloadType("MAP").addAllPayload(payloadItems).addAllLinks(links).build(); + .setReadOnly(iua.isReadOnly()).setPayloadType("STR").addAllPayload(payloadItems).addAllLinks(links).build(); return new WireContents<ByteBuffer>(ByteBuffer.wrap(iu.toByteArray()), "ipaaca-iu"); } diff --git a/ipaacalib/java/src/ipaaca/LocalIU.java b/ipaacalib/java/src/ipaaca/LocalIU.java index 56de4f6..6e87a82 100644 --- a/ipaacalib/java/src/ipaaca/LocalIU.java +++ b/ipaacalib/java/src/ipaaca/LocalIU.java @@ -230,7 +230,7 @@ public class LocalIU extends AbstractIU if (isPublished()) { // send update to remote holders - PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("str") // TODO: fix this, default in .proto? + PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR") .build(); IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true) .setWriterName(writer == null ? getOwnerName() : writer).addNewItems(newItem).build(); @@ -256,7 +256,7 @@ public class LocalIU extends AbstractIU .setWriterName(writer == null ? getOwnerName() : writer); for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet()) { - PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("str") // TODO: fix this, default in .proto? + PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("STR") .build(); builder.addNewItems(newItem); diff --git a/ipaacalib/java/src/ipaaca/Payload.java b/ipaacalib/java/src/ipaaca/Payload.java index 74c840b..315d686 100644 --- a/ipaacalib/java/src/ipaaca/Payload.java +++ b/ipaacalib/java/src/ipaaca/Payload.java @@ -95,8 +95,7 @@ public class Payload implements Map<String, String> } public String pseudoConvertFromJSON(String value, String type) { - if (type.equals("json")) { - System.out.println("Received JSON IU"); + if (type.equals("JSON")) { if (value.startsWith("\"")) { return value.replaceAll("\\\"", ""); } else if (value.startsWith("{") || value.startsWith("[") || value.matches("true") || value.matches("false") || value.matches("-?[0-9]*[.,]?[0-9][0-9]*.*")) { diff --git a/ipaacalib/java/src/ipaaca/RemotePushIU.java b/ipaacalib/java/src/ipaaca/RemotePushIU.java index 5b5cc2f..bfb8060 100644 --- a/ipaacalib/java/src/ipaaca/RemotePushIU.java +++ b/ipaacalib/java/src/ipaaca/RemotePushIU.java @@ -99,7 +99,7 @@ public class RemotePushIU extends AbstractIU { throw new IUReadOnlyException(this); } - PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("str").build();// TODO use default type in .proto + PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR").build(); IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision(getRevision()) .setWriterName(getBuffer().getUniqueName()).addNewItems(newItem).build(); @@ -144,7 +144,7 @@ public class RemotePushIU extends AbstractIU .setWriterName(getBuffer().getUniqueName()); for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet()) { - PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("str") // TODO: fix this, default in .proto? + PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("STR") .build(); builder.addNewItems(newItem); @@ -301,9 +301,9 @@ public class RemotePushIU extends AbstractIU payload.enforcedRemoveItem(key); } for (PayloadItem item : update.getNewItemsList()) { - if (item.getType().equals("str")) { + if (item.getType().equals("STR")) { payload.enforcedSetItem(item.getKey(), item.getValue()); - } else if (item.getType().equals("json")) { + } else if (item.getType().equals("JSON")) { String value = item.getValue(); if (value.startsWith("\"")) { payload.enforcedSetItem(item.getKey(), value.replaceAll("\\\"", "")); diff --git a/ipaacalib/python/src/ipaaca/buffer.py b/ipaacalib/python/src/ipaaca/buffer.py index cf9d3a8..b1496f7 100644 --- a/ipaacalib/python/src/ipaaca/buffer.py +++ b/ipaacalib/python/src/ipaaca/buffer.py @@ -567,7 +567,11 @@ class OutputBuffer(Buffer): new_items = {} if keys_to_remove is None: keys_to_remove = [] - payload_update = ipaaca.converter.IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision) + payload_update = ipaaca.converter.IUPayloadUpdate( + uid=iu._uid, + revision=revision, + is_delta=is_delta, + payload_type=iu.payload_type) payload_update.new_items = new_items if is_delta: payload_update.keys_to_remove = keys_to_remove diff --git a/ipaacalib/python/src/ipaaca/converter.py b/ipaacalib/python/src/ipaaca/converter.py index 0bda7a3..2bd3e69 100644 --- a/ipaacalib/python/src/ipaaca/converter.py +++ b/ipaacalib/python/src/ipaaca/converter.py @@ -37,6 +37,8 @@ import collections import rsb.converter import ipaaca_pb2 +import ipaaca.defaults +import ipaaca.exception import ipaaca.iu import ipaaca.misc @@ -76,17 +78,22 @@ class IntConverter(rsb.converter.Converter): return pbo.value -def pack_payload_entry(entry, key, value): +def pack_payload_entry(entry, key, value, _type=ipaaca.iu.IUPayloadType.JSON): entry.key = key - entry.value = json.dumps(value) - entry.type = 'json' + if _type == ipaaca.iu.IUPayloadType.JSON: + entry.value = json.dumps(value) + elif _type == ipaaca.iu.IUPayloadType.STR or _type == 'MAP': + entry.value = str(value) + else: + raise ipaaca.exception.IpaacaException('Asked to send payload entry with unsupported type "' + _type + '".') + entry.type = _type def unpack_payload_entry(entry): - # We assume that the only transfer types are 'str' or 'json'. Both are transparently handled by json.loads - if entry.type == 'json': + # We assume that the only transfer types are 'STR' or 'JSON'. Both are transparently handled by json.loads + if entry.type == ipaaca.iu.IUPayloadType.JSON: return json.loads(entry.value) - elif entry.type == 'str': + elif entry.type == ipaaca.iu.IUPayloadType.STR or entry.type == 'str': return entry.value else: LOGGER.warn('Received payload entry with unsupported type "' + entry.type + '".') @@ -115,7 +122,7 @@ class IUConverter(rsb.converter.Converter): pbo.read_only = iu._read_only for k, v in iu._payload.iteritems(): entry = pbo.payload.add() - pack_payload_entry(entry, k, v) + pack_payload_entry(entry, k, v, iu.payload_type) for type_ in iu._links.keys(): linkset = pbo.links.add() linkset.type = type_ @@ -138,7 +145,7 @@ class IUConverter(rsb.converter.Converter): read_only = pbo.read_only, owner_name = pbo.owner_name, category = pbo.category, - payload_type = pbo.payload_type, + payload_type = 'str' if pbo.payload_type is 'MAP' else pbo.payload_type, committed = pbo.committed, payload=_payload, links=_links) @@ -215,10 +222,11 @@ class IULinkUpdateConverter(rsb.converter.Converter): class IUPayloadUpdate(object): - def __init__(self, uid, revision, is_delta, writer_name="undef", new_items=None, keys_to_remove=None): + def __init__(self, uid, revision, is_delta, payload_type, writer_name="undef", new_items=None, keys_to_remove=None): super(IUPayloadUpdate, self).__init__() self.uid = uid self.revision = revision + self.payload_type = payload_type self.writer_name = writer_name self.is_delta = is_delta self.new_items = {} if new_items is None else new_items @@ -228,6 +236,7 @@ class IUPayloadUpdate(object): s = 'PayloadUpdate(' + 'uid=' + self.uid + ', ' s += 'revision='+str(self.revision)+', ' s += 'writer_name='+str(self.writer_name)+', ' + s += 'payload_type='+str(self.payload_type)+', ' s += 'is_delta='+str(self.is_delta)+', ' s += 'new_items = '+str(self.new_items)+', ' s += 'keys_to_remove = '+str(self.keys_to_remove)+')' @@ -243,9 +252,9 @@ class IUPayloadUpdateConverter(rsb.converter.Converter): pbo.uid = iu_payload_update.uid pbo.writer_name = iu_payload_update.writer_name pbo.revision = iu_payload_update.revision - for k,v in iu_payload_update.new_items.items(): + for k, v in iu_payload_update.new_items.items(): entry = pbo.new_items.add() - pack_payload_entry(entry, k, v) + pack_payload_entry(entry, k, v, iu_payload_update.payload_type) pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove) pbo.is_delta = iu_payload_update.is_delta return bytearray(pbo.SerializeToString()), self.wireSchema @@ -256,7 +265,7 @@ class IUPayloadUpdateConverter(rsb.converter.Converter): pbo = ipaaca_pb2.IUPayloadUpdate() pbo.ParseFromString( str(byte_stream) ) LOGGER.debug('received an IUPayloadUpdate for revision '+str(pbo.revision)) - iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta) + iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, payload_type=None, writer_name=pbo.writer_name, is_delta=pbo.is_delta) for entry in pbo.new_items: iu_up.new_items[entry.key] = unpack_payload_entry(entry) iu_up.keys_to_remove = pbo.keys_to_remove[:] diff --git a/ipaacalib/python/src/ipaaca/defaults.py b/ipaacalib/python/src/ipaaca/defaults.py index bbe30c5..706c714 100644 --- a/ipaacalib/python/src/ipaaca/defaults.py +++ b/ipaacalib/python/src/ipaaca/defaults.py @@ -31,5 +31,8 @@ # Excellence Initiative. IPAACA_DEFAULT_CHANNEL = 'default' + IPAACA_LOGGER_NAME = 'ipaaca' IPAACA_DEFAULT_LOGGING_LEVEL = 'WARNING' + +IPAACA_DEFAULT_IU_PAYLOAD_TYPE = 'JSON' # one of ipaaca.iu.IUPayloadType diff --git a/ipaacalib/python/src/ipaaca/iu.py b/ipaacalib/python/src/ipaaca/iu.py index 3e3b1a4..71f2050 100644 --- a/ipaacalib/python/src/ipaaca/iu.py +++ b/ipaacalib/python/src/ipaaca/iu.py @@ -46,6 +46,7 @@ import ipaaca.payload __all__ = [ 'IUAccessMode', 'IUEventType', + 'IUPayloadType', 'IU', 'Message', ] @@ -70,11 +71,17 @@ IUEventType = ipaaca.misc.enum( ) +IUPayloadType = ipaaca.misc.enum( + JSON = 'JSON', + STR = 'STR' +) + + class IUInterface(object): """Base class of all specialised IU classes.""" - def __init__(self, uid, access_mode=IUAccessMode.PUSH, read_only=False): + def __init__(self, uid, access_mode=IUAccessMode.PUSH, read_only=False, payload_type=None): """Creates an IU. Keyword arguments: @@ -85,12 +92,12 @@ class IUInterface(object): self._uid = uid self._revision = None self._category = None - self._payload_type = None self._owner_name = None self._committed = False self._retracted = False self._access_mode = access_mode self._read_only = read_only + self._payload_type = payload_type if payload_type is not None else ipaaca.defaults.IPAACA_DEFAULT_IU_PAYLOAD_TYPE self._buffer = None # payload is not present here self._links = collections.defaultdict(set) @@ -161,7 +168,15 @@ class IUInterface(object): def _get_payload_type(self): return self._payload_type - payload_type = property(fget=_get_payload_type, doc='Type of the IU payload') + def _set_payload_type(self, type): + if self._buffer is None: + self._payload_type = type + else: + raise IpaacaException('The IU is already in a buffer, cannot change payload type anymore.') + payload_type = property( + fget=_get_payload_type, + fset=_set_payload_type, + doc='Type of the IU payload') def _get_committed(self): return self._committed @@ -193,7 +208,7 @@ class IUInterface(object): return self._buffer def _set_buffer(self, buffer): if self._buffer is not None: - raise Exception('The IU is already in a buffer, cannot move it.') + raise IpaacaException('The IU is already in a buffer, cannot move it.') self._buffer = buffer buffer = property( fget=_get_buffer, @@ -216,12 +231,11 @@ class IU(IUInterface): """A local IU.""" - def __init__(self, category='undef', access_mode=IUAccessMode.PUSH, read_only=False, _payload_type='MAP'): - super(IU, self).__init__(uid=None, access_mode=access_mode, read_only=read_only) + def __init__(self, category='undef', access_mode=IUAccessMode.PUSH, read_only=False, payload_type=None): + super(IU, self).__init__(uid=None, access_mode=access_mode, read_only=read_only, payload_type=payload_type) self._revision = 1 self.uid = str(uuid.uuid4()) self._category = str(category) - self._payload_type = _payload_type self.revision_lock = threading.RLock() self._payload = ipaaca.payload.Payload(iu=self) @@ -322,8 +336,8 @@ class IU(IUInterface): class Message(IU): """Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls.""" - def __init__(self, category='undef', access_mode=IUAccessMode.MESSAGE, read_only=True, _payload_type='MAP'): - super(Message, self).__init__(category=str(category), access_mode=access_mode, read_only=read_only, _payload_type=_payload_type) + def __init__(self, category='undef', access_mode=IUAccessMode.MESSAGE, read_only=True, payload_type=None): + super(Message, self).__init__(category=str(category), access_mode=access_mode, read_only=read_only, payload_type=payload_type) def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None): if self.is_published: @@ -394,11 +408,10 @@ class RemoteMessage(IUInterface): """A remote IU with access mode 'MESSAGE'.""" def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links): - super(RemoteMessage, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only) + super(RemoteMessage, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only, payload_type=payload_type) self._revision = revision self._category = category self.owner_name = owner_name - self._payload_type = payload_type self._committed = committed self._retracted = False # NOTE Since the payload is an already-existant Payload which we didn't modify ourselves, @@ -462,11 +475,10 @@ class RemotePushIU(IUInterface): """A remote IU with access mode 'PUSH'.""" def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links): - super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only) + super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only, payload_type=payload_type) self._revision = revision self._category = category self.owner_name = owner_name - self._payload_type = payload_type self._committed = committed self._retracted = False # NOTE Since the payload is an already-existant Payload which we didn't modify ourselves, @@ -504,6 +516,7 @@ class RemotePushIU(IUInterface): requested_update = ipaaca.converter.IUPayloadUpdate( uid=self.uid, revision=self.revision, + payload_type=self.payload_type, is_delta=is_delta, writer_name=self.buffer.unique_name, new_items=new_items, @@ -545,6 +558,7 @@ class RemotePushIU(IUInterface): requested_update = ipaaca.converter.IUPayloadUpdate( uid=self.uid, revision=self.revision, + payload_type=self.payload_type, is_delta=False, writer_name=self.buffer.unique_name, new_items=new_pl, diff --git a/ipaacalib/python/src/ipaaca/misc.py b/ipaacalib/python/src/ipaaca/misc.py index a963a81..44932d5 100644 --- a/ipaacalib/python/src/ipaaca/misc.py +++ b/ipaacalib/python/src/ipaaca/misc.py @@ -53,8 +53,8 @@ def enum(*sequential, **named): whats-the-best-way-to-implement-an-enum-in-python/1695250#1695250 """ enums = dict(zip(sequential, range(len(sequential))), **named) - return type('Enum', (), enums) - + enums['_choices'] = enums.keys() + return type('Enum', (object,), enums) class IpaacaLoggingHandler(logging.Handler): @@ -106,6 +106,11 @@ class IpaacaArgumentParser(argparse.ArgumentParser): def __call__(self, parser, namespace, values, option_string=None): ipaaca.defaults.IPAACA_DEFAULT_CHANNEL = values + class IpaacaPayloadTypeAction(argparse.Action): + + def __call__(self, parser, namespace, values, option_string=None): + ipaaca.defaults.IPAACA_DEFAULT_IU_PAYLOAD_TYPE = values + class IpaacaLoggingLevelAction(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): @@ -132,6 +137,13 @@ class IpaacaArgumentParser(argparse.ArgumentParser): def _add_ipaaca_lib_arguments(self): ipaacalib_group = self.add_argument_group('IPAACA library arguments') + ipaacalib_group.add_argument( + '--ipaaca-payload-type', + action=self.IpaacaPayloadTypeAction, + choices=['JSON', 'STR'], # one of ipaaca.iu.IUPayloadTypes + dest='_ipaaca_payload_type_', + default='JSON', + help="specify payload type (default: 'JSON')") ipaacalib_group.add_argument( '--ipaaca-default-channel', action=self.IpaacaDefaultChannelAction, -- GitLab