Skip to content
Snippets Groups Projects
Commit a34a09e5 authored by Hendrik Buschmeier's avatar Hendrik Buschmeier
Browse files

Added JSON payload de/-encoding.

parent 1d6764f5
No related branches found
No related tags found
No related merge requests found
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
# "Incremental Processing Architecture # "Incremental Processing Architecture
# for Artificial Conversational Agents". # for Artificial Conversational Agents".
# #
# Copyright (c) 2009-2013 Sociable Agents Group # Copyright (c) 2009-2014 Social Cognitive Systems Group
# CITEC, Bielefeld University # CITEC, Bielefeld University
# #
# http://opensource.cit-ec.de/projects/ipaaca/ # http://opensource.cit-ec.de/projects/ipaaca/
...@@ -40,6 +40,13 @@ import collections ...@@ -40,6 +40,13 @@ import collections
import copy import copy
import time import time
try:
import simplejson as json
except ImportError:
import json
print('INFO: Using json instead of simplejson.')
print(' Install simplejson for better performance.')
import rsb import rsb
import rsb.converter import rsb.converter
...@@ -84,17 +91,6 @@ def enum(*sequential, **named): ...@@ -84,17 +91,6 @@ def enum(*sequential, **named):
return type('Enum', (), enums) return type('Enum', (), enums)
def pack_typed_payload_item(protobuf_object, key, value):
protobuf_object.key = key
protobuf_object.value = value
protobuf_object.type = 'str' # TODO: more types
def unpack_typed_payload_item(protobuf_object):
# TODO: more types
return (protobuf_object.key, protobuf_object.value)
class IpaacaLoggingHandler(logging.Handler): class IpaacaLoggingHandler(logging.Handler):
def __init__(self, level=logging.DEBUG): def __init__(self, level=logging.DEBUG):
...@@ -135,7 +131,6 @@ class IUPublishedError(Exception): ...@@ -135,7 +131,6 @@ class IUPublishedError(Exception):
def __init__(self, iu): def __init__(self, iu):
super(IUPublishedError, self).__init__('IU ' + str(iu.uid) + ' is already present in the output buffer.') super(IUPublishedError, self).__init__('IU ' + str(iu.uid) + ' is already present in the output buffer.')
class IUUpdateFailedError(Exception): class IUUpdateFailedError(Exception):
"""Error indicating that a remote IU update failed.""" """Error indicating that a remote IU update failed."""
def __init__(self, iu): def __init__(self, iu):
...@@ -151,7 +146,6 @@ class IUCommittedError(Exception): ...@@ -151,7 +146,6 @@ class IUCommittedError(Exception):
def __init__(self, iu): def __init__(self, iu):
super(IUCommittedError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it has been committed to.') super(IUCommittedError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it has been committed to.')
class IUReadOnlyError(Exception): class IUReadOnlyError(Exception):
"""Error indicating that an IU is immutable because it is 'read only'.""" """Error indicating that an IU is immutable because it is 'read only'."""
def __init__(self, iu): def __init__(self, iu):
...@@ -832,6 +826,16 @@ class IntConverter(rsb.converter.Converter):#{{{ ...@@ -832,6 +826,16 @@ class IntConverter(rsb.converter.Converter):#{{{
#}}} #}}}
def pack_payload_entry(entry, key, value):
entry.key = key
entry.value = json.dumps(value)
entry.type = 'json'
def unpack_payload_entry(entry):
# We assume that the only transfer types are 'str' or 'json'. Both are transparently handled by json.loads
return json.loads(entry.value)
class IUConverter(rsb.converter.Converter):#{{{ class IUConverter(rsb.converter.Converter):#{{{
''' '''
Converter class for Full IU representations Converter class for Full IU representations
...@@ -856,7 +860,7 @@ class IUConverter(rsb.converter.Converter):#{{{ ...@@ -856,7 +860,7 @@ class IUConverter(rsb.converter.Converter):#{{{
pbo.read_only = iu._read_only pbo.read_only = iu._read_only
for k,v in iu._payload.items(): for k,v in iu._payload.items():
entry = pbo.payload.add() entry = pbo.payload.add()
pack_typed_payload_item(entry, k, v) pack_payload_entry(entry, k, v)
for type_ in iu._links.keys(): for type_ in iu._links.keys():
linkset = pbo.links.add() linkset = pbo.links.add()
linkset.type = type_ linkset.type = type_
...@@ -866,15 +870,13 @@ class IUConverter(rsb.converter.Converter):#{{{ ...@@ -866,15 +870,13 @@ class IUConverter(rsb.converter.Converter):#{{{
def deserialize(self, byte_stream, ws): def deserialize(self, byte_stream, ws):
type = self.getDataType() type = self.getDataType()
#print('IUConverter.deserialize got a '+str(type)+' over wireSchema '+ws)
if type == IU or type == Message: if type == IU or type == Message:
pbo = ipaaca_pb2.IU() pbo = ipaaca_pb2.IU()
pbo.ParseFromString( str(byte_stream) ) pbo.ParseFromString( str(byte_stream) )
if pbo.access_mode == ipaaca_pb2.IU.PUSH: if pbo.access_mode == ipaaca_pb2.IU.PUSH:
_payload = {} _payload = {}
for entry in pbo.payload: for entry in pbo.payload:
k, v = unpack_typed_payload_item(entry) _payload[entry.key] = unpack_payload_entry(entry)
_payload[k] = v
_links = collections.defaultdict(set) _links = collections.defaultdict(set)
for linkset in pbo.links: for linkset in pbo.links:
for target_uid in linkset.targets: for target_uid in linkset.targets:
...@@ -894,8 +896,7 @@ class IUConverter(rsb.converter.Converter):#{{{ ...@@ -894,8 +896,7 @@ class IUConverter(rsb.converter.Converter):#{{{
elif pbo.access_mode == ipaaca_pb2.IU.MESSAGE: elif pbo.access_mode == ipaaca_pb2.IU.MESSAGE:
_payload = {} _payload = {}
for entry in pbo.payload: for entry in pbo.payload:
k, v = unpack_typed_payload_item(entry) _payload[entry.key] = unpack_payload_entry(entry)
_payload[k] = v
_links = collections.defaultdict(set) _links = collections.defaultdict(set)
for linkset in pbo.links: for linkset in pbo.links:
for target_uid in linkset.targets: for target_uid in linkset.targets:
...@@ -942,7 +943,7 @@ class MessageConverter(rsb.converter.Converter):#{{{ ...@@ -942,7 +943,7 @@ class MessageConverter(rsb.converter.Converter):#{{{
pbo.read_only = iu._read_only pbo.read_only = iu._read_only
for k,v in iu._payload.items(): for k,v in iu._payload.items():
entry = pbo.payload.add() entry = pbo.payload.add()
pack_typed_payload_item(entry, k, v) pack_payload_entry(entry, k, v)
for type_ in iu._links.keys(): for type_ in iu._links.keys():
linkset = pbo.links.add() linkset = pbo.links.add()
linkset.type = type_ linkset.type = type_
...@@ -952,15 +953,13 @@ class MessageConverter(rsb.converter.Converter):#{{{ ...@@ -952,15 +953,13 @@ class MessageConverter(rsb.converter.Converter):#{{{
def deserialize(self, byte_stream, ws): def deserialize(self, byte_stream, ws):
type = self.getDataType() type = self.getDataType()
#print('MessageConverter.deserialize got a '+str(type)+' over wireSchema '+ws)
if type == IU or type == Message: if type == IU or type == Message:
pbo = ipaaca_pb2.IU() pbo = ipaaca_pb2.IU()
pbo.ParseFromString( str(byte_stream) ) pbo.ParseFromString( str(byte_stream) )
if pbo.access_mode == ipaaca_pb2.IU.PUSH: if pbo.access_mode == ipaaca_pb2.IU.PUSH:
_payload = {} _payload = {}
for entry in pbo.payload: for entry in pbo.payload:
k, v = unpack_typed_payload_item(entry) _payload[entry.key] = unpack_payload_entry(entry)
_payload[k] = v
_links = collections.defaultdict(set) _links = collections.defaultdict(set)
for linkset in pbo.links: for linkset in pbo.links:
for target_uid in linkset.targets: for target_uid in linkset.targets:
...@@ -980,8 +979,7 @@ class MessageConverter(rsb.converter.Converter):#{{{ ...@@ -980,8 +979,7 @@ class MessageConverter(rsb.converter.Converter):#{{{
elif pbo.access_mode == ipaaca_pb2.IU.MESSAGE: elif pbo.access_mode == ipaaca_pb2.IU.MESSAGE:
_payload = {} _payload = {}
for entry in pbo.payload: for entry in pbo.payload:
k, v = unpack_typed_payload_item(entry) _payload[entry.key] = unpack_payload_entry(entry)
_payload[k] = v
_links = collections.defaultdict(set) _links = collections.defaultdict(set)
for linkset in pbo.links: for linkset in pbo.links:
for target_uid in linkset.targets: for target_uid in linkset.targets:
...@@ -1094,7 +1092,7 @@ class IUPayloadUpdateConverter(rsb.converter.Converter):#{{{ ...@@ -1094,7 +1092,7 @@ class IUPayloadUpdateConverter(rsb.converter.Converter):#{{{
pbo.revision = iu_payload_update.revision pbo.revision = iu_payload_update.revision
for k,v in iu_payload_update.new_items.items(): for k,v in iu_payload_update.new_items.items():
entry = pbo.new_items.add() entry = pbo.new_items.add()
pack_typed_payload_item(entry, k, v) pack_payload_entry(entry, k, v)
pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove) pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove)
pbo.is_delta = iu_payload_update.is_delta pbo.is_delta = iu_payload_update.is_delta
return bytearray(pbo.SerializeToString()), self.wireSchema return bytearray(pbo.SerializeToString()), self.wireSchema
...@@ -1107,8 +1105,7 @@ class IUPayloadUpdateConverter(rsb.converter.Converter):#{{{ ...@@ -1107,8 +1105,7 @@ class IUPayloadUpdateConverter(rsb.converter.Converter):#{{{
logger.debug('received an IUPayloadUpdate for revision '+str(pbo.revision)) logger.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta) iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_items: for entry in pbo.new_items:
k, v = unpack_typed_payload_item(entry) iu_up.new_items[entry.key] = unpack_payload_entry(entry)
iu_up.new_items[k] = v
iu_up.keys_to_remove = pbo.keys_to_remove[:] iu_up.keys_to_remove = pbo.keys_to_remove[:]
return iu_up return iu_up
else: else:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment