Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • scs/ipaaca
  • ramin.yaghoubzadeh/ipaaca
2 results
Show changes
Showing
with 793 additions and 397 deletions
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -34,7 +34,7 @@ from __future__ import division, print_function
import collections
import rsb.converter
#import rsb.converter
import ipaaca.ipaaca_pb2
import ipaaca.defaults
......@@ -45,230 +45,364 @@ import ipaaca.misc
LOGGER = ipaaca.misc.get_library_logger()
try:
import simplejson as json
import simplejson as json
except ImportError:
import json
LOGGER.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.')
import json
LOGGER.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.')
__all__ = [
'IntConverter',
'IUConverter',
'IULinkUpdate',
'IULinkUpdateConverter',
'IUPayloadUpdate',
'IUPayloadUpdateConverter',
'MessageConverter',
]
class IntConverter(rsb.converter.Converter):
"""Convert Python int objects to Protobuf ints and vice versa."""
def __init__(self, wireSchema="int", dataType=int):
super(IntConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, value):
pbo = ipaaca.ipaaca_pb2.IntMessage()
pbo.value = value
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IntMessage()
pbo.ParseFromString( str(byte_stream) )
return pbo.value
def pack_payload_entry(entry, key, value, _type=ipaaca.iu.IUPayloadType.JSON):
entry.key = key
if _type == ipaaca.iu.IUPayloadType.JSON:
entry.value = json.dumps(value)
elif _type == ipaaca.iu.IUPayloadType.STR or _type == 'MAP':
entry.value = str(value)
else:
raise ipaaca.exception.IpaacaException('Asked to send payload entry with unsupported type "' + _type + '".')
entry.type = _type
'IntConverter',
'IUConverter',
'IULinkUpdate',
'IULinkUpdateConverter',
'IUPayloadUpdate',
'IUPayloadUpdateConverter',
'MessageConverter',
'register_global_converter',
]
_LOW_LEVEL_WIRE_SCHEMA_MAP = None
def LOW_LEVEL_WIRE_SCHEMA_FOR(abstractname):
'''Map the abstract wire schema name (was used in RSB) to a
transport-dependent magic to detect on the wire.
Here: a required protobuf field'''
global _LOW_LEVEL_WIRE_SCHEMA_MAP
if _LOW_LEVEL_WIRE_SCHEMA_MAP is None:
_LOW_LEVEL_WIRE_SCHEMA_MAP = {
int: ipaaca.ipaaca_pb2.WireTypeIntMessage,
ipaaca.iu.IU: ipaaca.ipaaca_pb2.WireTypeIU,
ipaaca.iu.Message: ipaaca.ipaaca_pb2.WireTypeMessageIU,
IUPayloadUpdate: ipaaca.ipaaca_pb2.WireTypeIUPayloadUpdate,
IULinkUpdate: ipaaca.ipaaca_pb2.WireTypeIULinkUpdate,
'int': ipaaca.ipaaca_pb2.WireTypeIntMessage,
'ipaaca-iu': ipaaca.ipaaca_pb2.WireTypeIU,
'ipaaca-messageiu': ipaaca.ipaaca_pb2.WireTypeMessageIU,
'ipaaca-iu-payload-update': ipaaca.ipaaca_pb2.WireTypeIUPayloadUpdate,
'ipaaca-iu-link-update': ipaaca.ipaaca_pb2.WireTypeIULinkUpdate,
}
return _LOW_LEVEL_WIRE_SCHEMA_MAP.get(abstractname)
def __fail_no_type_converter():
raise ipaaca.exception.BackendSerializationError()
class FailingDict(dict):
def __init__(self, error_class, *args, **kwargs):
super(FailingDict, self).__init__(*args, **kwargs)
self._error_class = error_class
def __getitem__(self, k):
if k in self:
return dict.__getitem__(self, k)
else:
raise self._error_class(k)
# global converter / [un]marshaller store
__converter_registry_by_type = FailingDict(ipaaca.exception.BackendSerializationError)
__converter_registry_by_wire_schema = FailingDict(ipaaca.exception.BackendDeserializationError)
def register_global_converter(converter):
global __converter_registry_by_type, __converter_registry_by_wire_schema
real_wire_schema = LOW_LEVEL_WIRE_SCHEMA_FOR(converter._wire_schema)
if real_wire_schema is None:
raise NotImplementedError('There is no entry in the _LOW_LEVEL_WIRE_SCHEMA_MAP for '+str(converter._wire_schema))
if real_wire_schema in __converter_registry_by_wire_schema:
raise ipaaca.exception.ConverterRegistrationError(real_wire_schema)
if converter._data_type in __converter_registry_by_type:
raise ipaaca.exception.ConverterRegistrationError(converter._data_type.__name__)
__converter_registry_by_type[converter._data_type] = converter
__converter_registry_by_wire_schema[real_wire_schema] = converter
def deserialize(lowlevel_message):
pbo_outer = ipaaca.ipaaca_pb2.TransportLevelWrapper()
pbo_outer.ParseFromString(lowlevel_message)
type_ = pbo_outer.transport_message_type
#print('Received wire message type', type_)
if type_ in __converter_registry_by_wire_schema:
return __converter_registry_by_wire_schema[type_].deserialize(pbo_outer.raw_message, None)
else:
pbo = None
if type_ == ipaaca.ipaaca_pb2.WireTypeRemoteRequestResult:
pbo = ipaaca.ipaaca_pb2.RemoteRequestResult()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIURetraction:
pbo = ipaaca.ipaaca_pb2.IURetraction()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIUCommission:
pbo = ipaaca.ipaaca_pb2.IUCommission()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIUResendRequest:
pbo = ipaaca.ipaaca_pb2.IUResendRequest()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIUPayloadUpdateRequest:
pbo = ipaaca.ipaaca_pb2.IUPayloadUpdateRequest()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIUCommissionRequest:
pbo = ipaaca.ipaaca_pb2.IUCommissionRequest()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIULinkUpdateRequest:
pbo = ipaaca.ipaaca_pb2.IULinkUpdateRequest()
if pbo is None:
raise ipaaca.exception.BackendDeserializationError(type_)
else:
pbo.ParseFromString(pbo_outer.raw_message)
return pbo
raise ipaaca.exception.BackendDeserializationError(type_)
def serialize(obj):
inner, type_ = None, None
if obj.__class__ in __converter_registry_by_type:
cls_ = obj.__class__
inner, wire = __converter_registry_by_type[obj.__class__].serialize(obj)
type_ = LOW_LEVEL_WIRE_SCHEMA_FOR(wire)
else:
cls_ = obj.__class__
if cls_ == ipaaca.ipaaca_pb2.RemoteRequestResult:
type_ = ipaaca.ipaaca_pb2.WireTypeRemoteRequestResult
elif cls_ == ipaaca.ipaaca_pb2.IURetraction:
type_ = ipaaca.ipaaca_pb2.WireTypeIURetraction
elif cls_ == ipaaca.ipaaca_pb2.IUCommission:
type_ = ipaaca.ipaaca_pb2.WireTypeIUCommission
elif cls_ == ipaaca.ipaaca_pb2.IUResendRequest:
type_ = ipaaca.ipaaca_pb2.WireTypeIUResendRequest
elif cls_ == ipaaca.ipaaca_pb2.IUPayloadUpdateRequest:
type_ = ipaaca.ipaaca_pb2.WireTypeIUPayloadUpdateRequest
elif cls_ == ipaaca.ipaaca_pb2.IUCommissionRequest:
type_ = ipaaca.ipaaca_pb2.WireTypeIUCommissionRequest
elif cls_ == ipaaca.ipaaca_pb2.IULinkUpdateRequest:
type_ = ipaaca.ipaaca_pb2.WireTypeIULinkUpdateRequest
if type_ is None:
raise ipaaca.exception.BackendSerializationError(cls_)
else:
inner = obj.SerializeToString()
pbo = ipaaca.ipaaca_pb2.TransportLevelWrapper()
pbo.transport_message_type = type_
pbo.raw_message = inner
return bytearray(pbo.SerializeToString())
class ConverterBase(object):
'''Base for converters (to serialize and unserialize
data automatically depending on its Python type).'''
def __init__(self, substrate, data_type, wire_schema):
self._substrate = substrate
self._wire_schema = wire_schema
self._data_type = data_type
self.wireSchema = wire_schema # added compat with RSB
#print('Made a ConverterBase with wire '+str(self._wire_schema)+' and data '+str(self._data_type))
def serialize(self, value):
raise NotImplementedError('NOT IMPLEMENTED for ' \
+ self.__class__.__name__+': serialize')
def deserialize(self, stream, _UNUSED_override_wire_schema):
raise NotImplementedError('NOT IMPLEMENTED for ' \
+ self.__class__.__name__+': deserialize')
class IntConverter(ConverterBase):
"""Convert Python int objects to Protobuf ints and vice versa."""
def __init__(self, wireSchema="int", dataType=None):
super(IntConverter, self).__init__(bytearray, int, wireSchema)
def serialize(self, value):
pbo = ipaaca.ipaaca_pb2.IntMessage()
pbo.value = value
return pbo.SerializeToString(), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IntMessage()
pbo.ParseFromString(byte_stream)
return pbo.value
def pack_payload_entry(entry, key, value, _type=None):
#if _type is None: _type=ipaaca.iu.IUPayloadType.JSON
entry.key = key
if _type is None or _type == ipaaca.iu.IUPayloadType.JSON:
entry.value = json.dumps(value)
elif _type == ipaaca.iu.IUPayloadType.STR or _type == 'MAP':
entry.value = str(value)
else:
raise ipaaca.exception.IpaacaException('Asked to send payload entry with unsupported type "' + _type + '".')
entry.type = _type
def unpack_payload_entry(entry):
# We assume that the only transfer types are 'STR' or 'JSON'. Both are transparently handled by json.loads
if entry.type == ipaaca.iu.IUPayloadType.JSON:
return json.loads(entry.value)
elif entry.type == ipaaca.iu.IUPayloadType.STR or entry.type == 'str':
return entry.value
else:
LOGGER.warn('Received payload entry with unsupported type "' + entry.type + '".')
return entry.value
class IUConverter(rsb.converter.Converter):
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-iu", dataType=ipaaca.iu.IU):
super(IUConverter, self).__init__(bytearray, dataType, wireSchema)
self._access_mode = ipaaca.ipaaca_pb2.IU.PUSH
self._remote_data_type = ipaaca.iu.RemotePushIU
def serialize(self, iu):
pbo = ipaaca.ipaaca_pb2.IU()
pbo.access_mode = self._access_mode
pbo.uid = iu._uid
pbo.revision = iu._revision
pbo.category = iu._category
pbo.payload_type = iu._payload_type
pbo.owner_name = iu._owner_name
pbo.committed = iu._committed
pbo.read_only = iu._read_only
for k, v in iu._payload.iteritems():
entry = pbo.payload.add()
pack_payload_entry(entry, k, v, iu.payload_type)
for type_ in iu._links.keys():
linkset = pbo.links.add()
linkset.type = type_
linkset.targets.extend(iu._links[type_])
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IU()
pbo.ParseFromString(str(byte_stream))
_payload = {}
for entry in pbo.payload:
_payload[entry.key] = unpack_payload_entry(entry)
_links = collections.defaultdict(set)
for linkset in pbo.links:
for target_uid in linkset.targets:
_links[linkset.type].add(target_uid)
return self._remote_data_type(
uid=pbo.uid,
revision=pbo.revision,
read_only = pbo.read_only,
owner_name = pbo.owner_name,
category = pbo.category,
payload_type = 'str' if pbo.payload_type is 'MAP' else pbo.payload_type,
committed = pbo.committed,
payload=_payload,
links=_links)
# We assume that the only transfer types are 'STR' or 'JSON'. Both are transparently handled by json.loads
if entry.type == ipaaca.iu.IUPayloadType.JSON:
return json.loads(entry.value)
elif entry.type == ipaaca.iu.IUPayloadType.STR or entry.type == 'str':
return entry.value
else:
LOGGER.warn('Received payload entry with unsupported type "' + entry.type + '".')
return entry.value
class IUConverter(ConverterBase):
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-iu", dataType=None): #ipaaca.iu.IU):
super(IUConverter, self).__init__(bytearray, ipaaca.iu.IU if dataType is None else dataType, wireSchema)
self._access_mode = ipaaca.ipaaca_pb2.IU.PUSH
self._remote_data_type = ipaaca.iu.RemotePushIU
def serialize(self, iu):
pbo = ipaaca.ipaaca_pb2.IU()
pbo.access_mode = self._access_mode
pbo.uid = iu._uid
pbo.revision = iu._revision
pbo.category = iu._category
pbo.payload_type = iu._payload_type
pbo.owner_name = iu._owner_name
pbo.committed = iu._committed
pbo.read_only = iu._read_only
for k, v in iu._payload.items():
entry = pbo.payload.add()
pack_payload_entry(entry, k, v, iu.payload_type)
for type_ in iu._links.keys():
linkset = pbo.links.add()
linkset.type = type_
linkset.targets.extend(iu._links[type_])
return pbo.SerializeToString(), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IU()
pbo.ParseFromString(byte_stream)
_payload = {}
for entry in pbo.payload:
_payload[entry.key] = unpack_payload_entry(entry)
_links = collections.defaultdict(set)
for linkset in pbo.links:
for target_uid in linkset.targets:
_links[linkset.type].add(target_uid)
return self._remote_data_type(
uid=pbo.uid,
revision=pbo.revision,
read_only = pbo.read_only,
owner_name = pbo.owner_name,
category = pbo.category,
payload_type = 'str' if pbo.payload_type == 'MAP' else pbo.payload_type,
committed = pbo.committed,
payload=_payload,
links=_links)
class MessageConverter(IUConverter):
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-messageiu", dataType=ipaaca.iu.Message):
super(MessageConverter, self).__init__(wireSchema, dataType)
self._access_mode = ipaaca.ipaaca_pb2.IU.MESSAGE
self._remote_data_type = ipaaca.iu.RemoteMessage
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-messageiu", dataType=None): #ipaaca.iu.Message):
super(MessageConverter, self).__init__(wireSchema, ipaaca.iu.Message)
self._access_mode = ipaaca.ipaaca_pb2.IU.MESSAGE
self._remote_data_type = ipaaca.iu.RemoteMessage
class IULinkUpdate(object):
def __init__(self, uid, revision, is_delta, writer_name="undef", new_links=None, links_to_remove=None):
super(IULinkUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.writer_name = writer_name
self.is_delta = is_delta
self.new_links = collections.defaultdict(set) if new_links is None else collections.defaultdict(set, new_links)
self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove)
def __str__(self):
s = 'LinkUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_links = '+str(self.new_links)+', '
s += 'links_to_remove = '+str(self.links_to_remove)+')'
return s
class IULinkUpdateConverter(rsb.converter.Converter):
def __init__(self, wireSchema="ipaaca-iu-link-update", dataType=IULinkUpdate):
super(IULinkUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu_link_update):
pbo = ipaaca.ipaaca_pb2.IULinkUpdate()
pbo.uid = iu_link_update.uid
pbo.writer_name = iu_link_update.writer_name
pbo.revision = iu_link_update.revision
for type_ in iu_link_update.new_links.keys():
linkset = pbo.new_links.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.new_links[type_])
for type_ in iu_link_update.links_to_remove.keys():
linkset = pbo.links_to_remove.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.links_to_remove[type_])
pbo.is_delta = iu_link_update.is_delta
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
type = self.getDataType()
if type == IULinkUpdate:
pbo = ipaaca.ipaaca_pb2.IULinkUpdate()
pbo.ParseFromString( str(byte_stream) )
LOGGER.debug('received an IULinkUpdate for revision '+str(pbo.revision))
iu_link_up = IULinkUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_links:
iu_link_up.new_links[str(entry.type)] = set(entry.targets)
for entry in pbo.links_to_remove:
iu_link_up.links_to_remove[str(entry.type)] = set(entry.targets)
return iu_link_up
else:
raise ValueError("Inacceptable dataType %s" % type)
def __init__(self, uid, revision, is_delta, writer_name="undef", new_links=None, links_to_remove=None, request_uid=None, request_endpoint=None):
super(IULinkUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.writer_name = writer_name
self.is_delta = is_delta
self.new_links = collections.defaultdict(set) if new_links is None else collections.defaultdict(set, new_links)
self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove)
self.request_uid = request_uid
self.request_endpoint = request_endpoint
def __str__(self):
s = 'LinkUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_links = '+str(self.new_links)+', '
s += 'links_to_remove = '+str(self.links_to_remove)+')'
return s
class IULinkUpdateConverter(ConverterBase):
def __init__(self, wireSchema="ipaaca-iu-link-update", dataType=None): #=IULinkUpdate):
super(IULinkUpdateConverter, self).__init__(bytearray, IULinkUpdate, wireSchema)
def serialize(self, iu_link_update):
pbo = ipaaca.ipaaca_pb2.IULinkUpdate()
pbo.uid = iu_link_update.uid
pbo.writer_name = iu_link_update.writer_name
pbo.revision = iu_link_update.revision
if iu_link_update.request_uid:
pbo.request_uid = iu_link_update.request_uid
if iu_link_update.request_endpoint:
pbo.request_endpoint = iu_link_update.request_endpoint
for type_ in iu_link_update.new_links.keys():
linkset = pbo.new_links.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.new_links[type_])
for type_ in iu_link_update.links_to_remove.keys():
linkset = pbo.links_to_remove.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.links_to_remove[type_])
pbo.is_delta = iu_link_update.is_delta
return pbo.SerializeToString(), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IULinkUpdate()
pbo.ParseFromString(byte_stream)
LOGGER.debug('received an IULinkUpdate for revision '+str(pbo.revision))
iu_link_up = IULinkUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta, request_uid=pbo.request_uid, request_endpoint=pbo.request_endpoint)
for entry in pbo.new_links:
iu_link_up.new_links[str(entry.type)] = set(entry.targets)
for entry in pbo.links_to_remove:
iu_link_up.links_to_remove[str(entry.type)] = set(entry.targets)
return iu_link_up
class IUPayloadUpdate(object):
def __init__(self, uid, revision, is_delta, payload_type, writer_name="undef", new_items=None, keys_to_remove=None):
super(IUPayloadUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.payload_type = payload_type
self.writer_name = writer_name
self.is_delta = is_delta
self.new_items = {} if new_items is None else new_items
self.keys_to_remove = [] if keys_to_remove is None else keys_to_remove
def __str__(self):
s = 'PayloadUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'payload_type='+str(self.payload_type)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_items = '+str(self.new_items)+', '
s += 'keys_to_remove = '+str(self.keys_to_remove)+')'
return s
class IUPayloadUpdateConverter(rsb.converter.Converter):
def __init__(self, wireSchema="ipaaca-iu-payload-update", dataType=IUPayloadUpdate):
super(IUPayloadUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu_payload_update):
pbo = ipaaca.ipaaca_pb2.IUPayloadUpdate()
pbo.uid = iu_payload_update.uid
pbo.writer_name = iu_payload_update.writer_name
pbo.revision = iu_payload_update.revision
for k, v in iu_payload_update.new_items.items():
entry = pbo.new_items.add()
pack_payload_entry(entry, k, v, iu_payload_update.payload_type)
pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove)
pbo.is_delta = iu_payload_update.is_delta
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
type = self.getDataType()
if type == IUPayloadUpdate:
pbo = ipaaca.ipaaca_pb2.IUPayloadUpdate()
pbo.ParseFromString( str(byte_stream) )
LOGGER.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, payload_type=None, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_items:
iu_up.new_items[entry.key] = unpack_payload_entry(entry)
iu_up.keys_to_remove = pbo.keys_to_remove[:]
return iu_up
else:
raise ValueError("Inacceptable dataType %s" % type)
def __init__(self, uid, revision, is_delta, payload_type, writer_name="undef", new_items=None, keys_to_remove=None, request_uid=None, request_endpoint=None):
super(IUPayloadUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.payload_type = payload_type
self.writer_name = writer_name
self.is_delta = is_delta
self.new_items = {} if new_items is None else new_items
self.keys_to_remove = [] if keys_to_remove is None else keys_to_remove
self.request_uid = request_uid
self.request_endpoint = request_endpoint
def __str__(self):
s = 'PayloadUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'payload_type='+str(self.payload_type)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_items = '+str(self.new_items)+', '
s += 'keys_to_remove = '+str(self.keys_to_remove)+')'
return s
class IUPayloadUpdateConverter(ConverterBase):
def __init__(self, wireSchema="ipaaca-iu-payload-update", dataType=None):
super(IUPayloadUpdateConverter, self).__init__(bytearray, IUPayloadUpdate, wireSchema)
def serialize(self, iu_payload_update):
pbo = ipaaca.ipaaca_pb2.IUPayloadUpdate()
pbo.uid = iu_payload_update.uid
pbo.writer_name = iu_payload_update.writer_name
pbo.revision = iu_payload_update.revision
if iu_payload_update.request_uid:
pbo.request_uid = iu_payload_update.request_uid
if iu_payload_update.request_endpoint:
pbo.request_endpoint = iu_payload_update.request_endpoint
for k, v in iu_payload_update.new_items.items():
entry = pbo.new_items.add()
pack_payload_entry(entry, k, v, iu_payload_update.payload_type)
pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove)
pbo.is_delta = iu_payload_update.is_delta
return pbo.SerializeToString(), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IUPayloadUpdate()
pbo.ParseFromString(byte_stream)
LOGGER.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, payload_type=None, writer_name=pbo.writer_name, is_delta=pbo.is_delta, request_uid=pbo.request_uid, request_endpoint=pbo.request_endpoint)
for entry in pbo.new_items:
iu_up.new_items[entry.key] = unpack_payload_entry(entry)
iu_up.keys_to_remove = pbo.keys_to_remove[:]
return iu_up
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2016 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2015 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -48,6 +48,35 @@ __all__ = [
class IpaacaError(Exception): pass
class BackendInitializationError(IpaacaError):
"""Error indicating that type marshalling cannot proceed
because no matching converter is known."""
def __init__(self, name=''):
super(BackendInitializationError, self).__init__( \
'Failed to initialize selected backend '+str(name))
class BackendSerializationError(IpaacaError):
"""Error indicating that type marshalling cannot proceed
because no matching converter is known."""
def __init__(self, typ):
super(BackendSerializationError, self).__init__( \
'Could not serialize type ' + str(typ.__name__) \
+ ' - no converter registered.')
class BackendDeserializationError(IpaacaError):
"""Error indicating that type unmarshalling cannot proceed
because no matching converter is known."""
def __init__(self, wire_schema):
super(BackendDeserializationError, self).__init__( \
'Could not deserialize wire format "' + str(wire_schema) \
+ '" - no converter registered.')
class ConverterRegistrationError(IpaacaError):
'''Error indicating that a type or wire schema already had a registered converter.'''
def __init__(self, type_name_or_schema):
super(ConverterRegistrationError, self).__init__(
'Failed to register a converter: we already have one for ' \
+ str(type_name_or_schema))
class IUCommittedError(IpaacaError):
"""Error indicating that an IU is immutable because it has been committed to."""
......@@ -88,10 +117,15 @@ class IUReadOnlyError(IpaacaError):
class IUResendRequestFailedError(IpaacaError):
"""Error indicating that a remote IU resend failed."""
def __init__(self, iu):
def __init__(self, iu_uid):
super(IUResendRequestFailedError, self).__init__(
'Remote resend failed for IU ' + str(iu.uid) + '.')
'Remote resend failed for IU ' + str(iu_uid))
class IUResendRequestRemoteServerUnknownError(IpaacaError):
"""Error indicating that a remote IU resend failed."""
def __init__(self, iu_uid):
super(IUResendRequestRemoteServerUnknownError, self).__init__(
'Remote resend request: remote server unknown for IU ' + str(iu_uid))
class IURetractedError(IpaacaError):
"""Error indicating that an IU has been retracted."""
......
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2015 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -37,6 +37,8 @@ import copy
import threading
import uuid
import six
import ipaaca.ipaaca_pb2
import ipaaca.converter
import ipaaca.exception
......@@ -104,18 +106,18 @@ class IUInterface(object):
self._links = collections.defaultdict(set)
def __str__(self):
s = unicode(self.__class__)+"{ "
s = str(self.__class__)+"{ "
s += "category="+("<None>" if self._category is None else self._category)+" "
s += "uid="+self._uid+" "
s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") "
s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " "
s += "payload={ "
for k,v in self.payload.items():
s += k+":'"+unicode(v)+"', "
s += k+":'"+str(v)+"', "
s += "} "
s += "links={ "
for t, ids in self.get_all_links().items():
s += t+":'"+unicode(ids)+"', "
s += t+":'"+str(ids)+"', "
s += "} "
s += "}"
return s
......@@ -135,13 +137,13 @@ class IUInterface(object):
def add_links(self, type, targets, writer_name=None):
'''Attempt to add links if the conditions are met
and send an update message. Then call the local setter.'''
if not hasattr(targets, '__iter__'): targets=[targets]
if isinstance(targets, six.string_types) and hasattr(targets, '__iter__'): targets=[targets]
self._modify_links(is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name)
self._add_and_remove_links( add={type:targets}, remove={} )
def remove_links(self, type, targets, writer_name=None):
'''Attempt to remove links if the conditions are met
and send an update message. Then call the local setter.'''
if not hasattr(targets, '__iter__'): targets=[targets]
if isinstance(targets, six.string_types) and hasattr(targets, '__iter__'): targets=[targets]
self._modify_links(is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name)
self._add_and_remove_links( add={}, remove={type:targets} )
def modify_links(self, add, remove, writer_name=None):
......
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2016 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -35,10 +35,10 @@ from __future__ import division, print_function
import argparse
import logging
import sys
import ipaaca.defaults
__all__ = [
'enum',
'IpaacaArgumentParser',
......@@ -54,6 +54,7 @@ def enum(*sequential, **named):
"""
enums = dict(zip(sequential, range(len(sequential))), **named)
enums['_choices'] = enums.keys()
enums['_values'] = enums.values() # RY e.g. see if raw int is valid
return type('Enum', (object,), enums)
......@@ -69,6 +70,21 @@ class IpaacaLoggingHandler(logging.Handler):
msg = str(record.msg.format(record.args))
print(meta + msg)
class RSBLoggingHandler(logging.Handler):
'''A logging handler that prints to stdout, RSB version.'''
def __init__(self, prefix='IPAACA', level=logging.NOTSET):
logging.Handler.__init__(self, level)
self._prefix = prefix
def emit(self, record):
meta = '[%s: %s] ' % (self._prefix, str(record.levelname))
try:
msg = str(record.msg % record.args)
except:
msg = str(record.msg) + ' WITH ARGS: ' + str(record.args)
print(meta + msg)
class GenericNoLoggingHandler(logging.Handler):
'''A logging handler that produces no output'''
......@@ -80,21 +96,21 @@ def get_library_logger():
return logging.getLogger(ipaaca.defaults.IPAACA_LOGGER_NAME)
__IPAACA_LOGGING_HANDLER = IpaacaLoggingHandler('IPAACA')
__GENERIC_NO_LOG_HANDLER = GenericNoLoggingHandler()
_IPAACA_LOGGING_HANDLER = IpaacaLoggingHandler('IPAACA')
_GENERIC_NO_LOG_HANDLER = GenericNoLoggingHandler()
# By default, suppress library logging
# - for IPAACA
get_library_logger().addHandler(__GENERIC_NO_LOG_HANDLER)
get_library_logger().addHandler(_GENERIC_NO_LOG_HANDLER)
# - for RSB
logging.getLogger('rsb').addHandler(__GENERIC_NO_LOG_HANDLER)
logging.getLogger('rsb').addHandler(_GENERIC_NO_LOG_HANDLER)
def enable_logging(level=None):
'''Enable ipaaca's 'library-wide logging.'''
ipaaca_logger = get_library_logger()
ipaaca_logger.addHandler(__IPAACA_LOGGING_HANDLER)
ipaaca_logger.removeHandler(__GENERIC_NO_LOG_HANDLER)
ipaaca_logger.addHandler(_IPAACA_LOGGING_HANDLER)
ipaaca_logger.removeHandler(_GENERIC_NO_LOG_HANDLER)
ipaaca_logger.setLevel(level=level if level is not None else
ipaaca.defaults.IPAACA_DEFAULT_LOGGING_LEVEL)
......@@ -120,8 +136,8 @@ class IpaacaArgumentParser(argparse.ArgumentParser):
def __call__(self, parser, namespace, values, option_string=None):
rsb_logger = logging.getLogger('rsb')
rsb_logger.addHandler(IpaacaLoggingHandler('RSB'))
rsb_logger.removeHandler(__GENERIC_NO_LOG_HANDLER)
rsb_logger.addHandler(RSBLoggingHandler('RSB'))
rsb_logger.removeHandler(_GENERIC_NO_LOG_HANDLER)
rsb_logger.setLevel(level=values)
class IpaacaRSBHost(argparse.Action):
......
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -30,7 +30,7 @@
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import threading
import time
......@@ -53,12 +53,12 @@ class Payload(dict):
def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False, update_timeout=_DEFAULT_PAYLOAD_UPDATE_TIMEOUT):
self.iu = iu
_pl = {}
for k, v in ({} if new_payload is None else new_payload).iteritems():
_pl[unicode(k, 'utf8') if type(k) == str else k] = unicode(v, 'utf8') if type(v) == str else v
for k, v in ({} if new_payload is None else new_payload).items():
_pl[str(k) if type(k) == str else k] = str(v) if type(v) == str else v
# NOTE omit_init_update_message is necessary to prevent checking for
# exceptions and sending updates in the case where we just receive
# a whole new payload from the remote side and overwrite it locally.
for k, v in _pl.iteritems():
for k, v in _pl.items():
dict.__setitem__(self, k, v)
if (not omit_init_update_message) and (self.iu.buffer is not None):
self.iu._modify_payload(
......@@ -85,8 +85,8 @@ class Payload(dict):
def __setitem__(self, k, v, writer_name=None):
with self._batch_update_lock:
k = unicode(k, 'utf8') if type(k) == str else k
v = unicode(v, 'utf8') if type(v) == str else v
k = str(k) if type(k) == str else k
v = str(v) if type(v) == str else v
if self._update_on_every_change:
self.iu._modify_payload(
is_delta=True,
......@@ -102,7 +102,7 @@ class Payload(dict):
def __delitem__(self, k, writer_name=None):
with self._batch_update_lock:
k = unicode(k, 'utf8') if type(k) == str else k
k = str(k) if type(k) == str else k
if self._update_on_every_change:
self.iu._modify_payload(
is_delta=True,
......@@ -136,9 +136,9 @@ class Payload(dict):
def merge(self, payload, writer_name=None):
with self._batch_update_lock:
for k, v in payload.iteritems():
k = unicode(k, 'utf8') if type(k) == str else k
v = unicode(v, 'utf8') if type(v) == str else v
for k, v in payload.items():
k = str(k) if type(k) == str else k
v = str(v) if type(v) == str else v
self.iu._modify_payload(
is_delta=True,
new_items=payload,
......@@ -220,18 +220,22 @@ class PayloadItemDictProxy(PayloadItemProxy, dict):
value = self.content.get(key, default)
return self._create_proxy(value, key)
def items(self):
return [(key, value) for key, value in self.iteritems()]
# py3port: were these used at all?
# def items(self):
# return [(key, value) for key, value in self.items()]
def iteritems(self):
for key, value in self.content.iteritems():
# py3port: was iteritems
def items(self):
for key, value in self.content.items():
yield key, self._create_proxy(value, key)
def values(self):
return [value for value in self.itervalues()]
# py3port: were these used at all?
# def values(self):
# return [value for value in self.values()]
def itervalues(self):
for key, value in self.content.iteritems():
# py3port: was itervalues
def values(self):
for key, value in self.content.items():
yield self._create_proxy(value, key)
def pop(self, key, *args):
......
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -32,4 +32,4 @@
from __future__ import division, print_function
from notifier import ComponentNotifier
from .notifier import ComponentNotifier
......@@ -5,7 +5,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2015 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -42,12 +42,15 @@ import traceback
import uuid
import ipaaca
import ipaaca.misc
import six
__all__ = [
'logger_send_ipaaca_logs',
'logger_set_log_filename',
'logger_set_module_name',
'logger_set_log_level',
'LOG_DEBUG',
'LOG_INFO',
'LOG_WARN',
......@@ -55,6 +58,25 @@ __all__ = [
'LOG_ERROR',
]
LogLevel = ipaaca.misc.enum(
DEBUG = 0,
INFO = 1,
WARN = 2,
ERROR = 3,
SILENT = 4,
)
LOG_LEVEL_FROM_STRING_DICT = {
'DEBUG': LogLevel.DEBUG,
'INFO': LogLevel.INFO,
'WARN': LogLevel.WARN,
'WARNING': LogLevel.WARN,
'ERROR': LogLevel.ERROR,
'NONE': LogLevel.SILENT,
'SILENT': LogLevel.SILENT,
}
CURRENT_LOG_LEVEL = LogLevel.DEBUG
LOGGER_LOCK = threading.RLock()
MODULE_NAME = sys.argv[0]
......@@ -98,6 +120,15 @@ def logger_send_ipaaca_logs(flag=True):
with LOGGER_LOCK:
SEND_IPAACA_LOGS = flag
def logger_set_log_level(level=LogLevel.DEBUG):
global CURRENT_LOG_LEVEL
with LOGGER_LOCK:
if level in LogLevel._values:
CURRENT_LOG_LEVEL = level
elif isinstance(level, six.string_types) and level.upper() in LOG_LEVEL_FROM_STRING_DICT:
CURRENT_LOG_LEVEL = LOG_LEVEL_FROM_STRING_DICT[level.upper()]
else:
pass # leave previous setting untouched
def LOG_IPAACA(lvl, text, now=0.0, fn='???', thread='???'):
global OUTPUT_BUFFER
......@@ -116,24 +147,25 @@ def LOG_IPAACA(lvl, text, now=0.0, fn='???', thread='???'):
'text': text,}
try:
OUTPUT_BUFFER.add(msg)
except Exception, e:
except Exception as e:
LOG_ERROR('Caught an exception while logging via ipaaca. '
+ ' str(e); '
+ traceback.format_exc())
def LOG_CONSOLE(lvlstr, msg, fn_markup='', msg_markup='', now=0.0, fn='???', thread='???'):
if isinstance(msg, basestring):
if isinstance(msg, six.string_types):
lines = msg.split('\n')
else:
lines = [msg]
for line in lines:
text = lvlstr+' '+thread+' '+fn_markup+fn+''+' '+msg_markup+unicode(line)+''
text = lvlstr+' '+thread+' '+fn_markup+fn+'\033[m'+' '+msg_markup+str(line)+'\033[m'
print(text)
fn = ' '*len(fn)
def LOG_ERROR(msg, now=None):
if CURRENT_LOG_LEVEL > LogLevel.ERROR: return
now = time.time() if now is None else now
f = sys._getframe(1)
classprefix = (f.f_locals['self'].__class__.__name__+'.') if 'self' in f.f_locals else ''
......@@ -141,10 +173,11 @@ def LOG_ERROR(msg, now=None):
thread = threading.current_thread().getName()
with LOGGER_LOCK:
if SEND_IPAACA_LOGS: LOG_IPAACA('ERROR', msg, now=now, fn=fn, thread=thread)
LOG_CONSOLE('[ERROR]', msg, fn_markup='', msg_markup='', now=now, fn=fn, thread=thread)
LOG_CONSOLE('\033[38;5;9;1;4m[ERROR]\033[m', msg, fn_markup='\033[38;5;203m', msg_markup='\033[38;5;9;1;4m', now=now, fn=fn, thread=thread)
def LOG_WARN(msg, now=None):
if CURRENT_LOG_LEVEL > LogLevel.WARN: return
now = time.time() if now is None else now
f = sys._getframe(1)
classprefix = (f.f_locals['self'].__class__.__name__+'.') if 'self' in f.f_locals else ''
......@@ -152,13 +185,14 @@ def LOG_WARN(msg, now=None):
thread = threading.current_thread().getName()
with LOGGER_LOCK:
if SEND_IPAACA_LOGS: LOG_IPAACA('WARN', msg, now=now, fn=fn, thread=thread)
LOG_CONSOLE('[WARN] ', msg, fn_markup='', msg_markup='', now=now, fn=fn, thread=thread)
LOG_CONSOLE('\033[38;5;208;1m[WARN]\033[m ', msg, fn_markup='\033[38;5;214m', msg_markup='\033[38;5;208;1m', now=now, fn=fn, thread=thread)
LOG_WARNING = LOG_WARN
def LOG_INFO(msg, now=None):
if CURRENT_LOG_LEVEL > LogLevel.INFO: return
now = time.time() if now is None else now
f = sys._getframe(1)
classprefix = (f.f_locals['self'].__class__.__name__+'.') if 'self' in f.f_locals else ''
......@@ -170,6 +204,7 @@ def LOG_INFO(msg, now=None):
def LOG_DEBUG(msg, now=None):
if CURRENT_LOG_LEVEL > LogLevel.DEBUG: return
now = time.time() if now is None else now
f = sys._getframe(1)
classprefix = (f.f_locals['self'].__class__.__name__+'.') if 'self' in f.f_locals else ''
......@@ -177,7 +212,7 @@ def LOG_DEBUG(msg, now=None):
thread = threading.current_thread().getName()
with LOGGER_LOCK:
if SEND_IPAACA_LOGS: LOG_IPAACA('DEBUG', msg, now=now, fn=fn, thread=thread)
LOG_CONSOLE('[DEBUG]', msg, fn_markup='', msg_markup='', now=now, fn=fn, thread=thread)
LOG_CONSOLE('\033[2m[DEBUG]\033[m', msg, fn_markup='\033[38;5;144m', msg_markup='\033[38;5;248m', now=now, fn=fn, thread=thread)
class LoggerComponent(object):
......@@ -219,7 +254,7 @@ class LoggerComponent(object):
self.log_mode == 'timestamp')
new_logfile = open(filename, 'a' if append_if_exist else 'w')
if self.logfile is not None:
text = u'Will now continue logging in log file ' + unicode(filename)
text = u'Will now continue logging in log file ' + str(filename)
uid = str(uuid.uuid4())[0:8]
tim = time.time()
record = {
......@@ -231,7 +266,7 @@ class LoggerComponent(object):
'function': u'LoggerComponent.open_logfile',
'thread': '-',
'logreceivedtime': tim}
self.logfile.write(unicode(record)+'\n')
self.logfile.write(str(record)+'\n')
self.logfile.close()
self.logfile = new_logfile
print('Logging to console and {filename} ...'.format(filename=filename))
......@@ -241,8 +276,8 @@ class LoggerComponent(object):
def close_logfile(self):
if self.logfile is not None:
text = u'Closing of log file requested.'
uid = unicode(uuid.uuid4())[0:8]
tim = unicode(time.time())
uid = str(uuid.uuid4())[0:8]
tim = str(time.time())
record = {
'uuid': uid,
'time': tim,
......@@ -252,7 +287,7 @@ class LoggerComponent(object):
'function': u'LoggerComponent.close_logfile',
'thread': u'-',
'logreceivedtime': tim}
self.logfile.write(unicode(record)+'\n')
self.logfile.write(str(record)+'\n')
self.logfile.close()
print('Closed current log file.')
self.logfile = None
......@@ -290,24 +325,24 @@ class LoggerComponent(object):
'function': function,
'thread': thread,
'logreceivedtime': received_time}
self.logfile.write(unicode(record) + '\n')
self.logfile.write(str(record) + '\n')
except:
print('Failed to write to logfile!')
elif iu.category == 'logcontrol':
cmd = iu.payload['cmd']
cmd = iu.payload['cmd'] if 'cmd' in iu.payload else 'undef'
if cmd == 'open_log_file':
filename = iu.payload['filename'] if 'filename' in iu.payload else ''
if 'existing' in iu.payload:
log_mode_ = iu.payload['existing'].lower()
if log_mode_ not in LOG_MODES:
LOG_WARN(u'Value of "existing" should be "append", timestamp, or "overwrite", continuing with mode {mode}'.format(mode=self.log_mode))
LOG_WARN(u'Value of "existing" should be "append", "timestamp", or "overwrite", continuing with mode {mode}'.format(mode=self.log_mode))
else:
self.log_mode = log_mode_
self.open_logfile(filename)
elif cmd == 'close_log_file':
self.close_logfile()
else:
LOG_WARN(u'Received unknown logcontrol command: '+unicode(cmd))
except Exception, e:
LOG_WARN(u'Received unknown logcontrol command: '+str(cmd))
except Exception as e:
print('Exception while logging!') # TODO write to file as well?
print(u' '+unicode(traceback.format_exc()))
print(u' '+str(traceback.format_exc()))
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -33,9 +33,9 @@
from __future__ import division, print_function
import os
import threading
import ipaaca.buffer
import ipaaca.iu
import ipaaca.misc
......@@ -64,11 +64,16 @@ class ComponentError(Exception):
class ComponentNotifier(object):
NOTIFY_CATEGORY = "componentNotify"
CONTROL_CATEGORY = "componentControl"
SEND_CATEGORIES = "send_categories"
RECEIVE_CATEGORIES = "recv_categories"
CMD = "cmd"
STATE = "state"
NAME = "name"
WHO = "who" # list of names (or empty)
FUNCTION = "function"
PID = "pid"
CMD_REPORT = "report"
def __init__(self, component_name, component_function, send_categories, receive_categories, out_buffer=None, in_buffer=None):
self.component_name = component_name
......@@ -116,14 +121,23 @@ class ComponentNotifier(object):
self.out_buffer.add(notify_iu)
def _handle_iu_event(self, iu, event_type, local):
if iu.payload[ComponentNotifier.NAME] == self.component_name:
return
with self.notification_handler_lock:
for h in self.notification_handlers:
h(iu, event_type, local)
if iu.payload[ComponentNotifier.STATE] == "new":
#print("submitting")
self._submit_notify(False)
if iu.category == ComponentNotifier.NOTIFY_CATEGORY:
if iu.payload[ComponentNotifier.NAME] == self.component_name:
return
with self.notification_handler_lock:
for h in self.notification_handlers:
h(iu, event_type, local)
if iu.payload[ComponentNotifier.STATE] == "new":
#print("submitting")
self._submit_notify(False)
elif iu.category == ComponentNotifier.CONTROL_CATEGORY:
cmd = iu.payload[ComponentNotifier.CMD]
if cmd=='report':
# Request to report (by component controller)
who = iu.payload[ComponentNotifier.WHO]
# If we are named specifically or it's a broadcast
if len(who)==0 or self.component_name in who:
self._submit_notify(False)
def add_notification_handler(self, handler):
with self.notification_handler_lock:
......@@ -155,12 +169,14 @@ class ComponentNotifier(object):
if (not self.initialized):
self.timesync_slave = ipaaca.util.timesync.TimesyncSlave(component_name=self.component_name, timing_handler=self.launch_timesync_slave_handlers)
self.timesync_master = ipaaca.util.timesync.TimesyncMaster(component_name=self.component_name, timing_handler=self.launch_timesync_master_handlers)
self.in_buffer.register_handler(self._handle_iu_event, ipaaca.iu.IUEventType.MESSAGE, ComponentNotifier.NOTIFY_CATEGORY)
self.in_buffer.register_handler(self._handle_iu_event, ipaaca.iu.IUEventType.MESSAGE, [ComponentNotifier.NOTIFY_CATEGORY, ComponentNotifier.CONTROL_CATEGORY])
self._submit_notify(True)
self.initialized = True
def __enter__(self):
self.initialize()
return self
def __exit__(self, t, v, tb):
self.terminate()
return self
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -32,6 +32,7 @@
from __future__ import division, print_function
import threading
import time
import ipaaca.buffer
......@@ -45,7 +46,7 @@ class TimesyncMaster(object):
# component name to report (None => use buffer name)
self.component_name = component_name if component_name is not None else self.ob.unique_name
#
self.ob.register_handler(self.handle_timesync_master)
#self.ob.register_handler(self.handle_timesync_master)
self.ib.register_handler(self.handle_timesync_master)
# master_t1 is identical for all slaves
self.master_t1 = None
......@@ -63,7 +64,7 @@ class TimesyncMaster(object):
self.timing_handler = timing_handler
def send_master_timesync(self):
iu = ipaaca.iu.IU('timesyncRequest')
iu = ipaaca.iu.Message('timesyncRequest')
self.master_t1 = self.get_time()
iu.payload = {
'stage':'0',
......@@ -71,47 +72,48 @@ class TimesyncMaster(object):
'master':self.component_name,
}
self.ob.add(iu)
print('Sent a stage 0 timesync as master '+self.component_name)
def handle_timesync_master(self, iu, event_type, own):
master = iu.payload['master']
if not own and master == self.component_name:
if self.component_name == master:
# reply to our own initial IU
slave = iu.payload['slave']
stage = iu.payload['stage']
if stage=='1':
print('Received stage 1 from slave '+slave)
# initial reply by slave
t1 = iu.payload['slave_t1']
self.slave_t1s[slave] = float(t1)
t2 = self.master_t2s[slave] = self.get_time()
iu.payload.merge({'master_t2': str(t2), 'stage':'2'})
latency1 = t2 - self.master_t1
self.latencies[slave] = latency1
#print('Latency of round-trip 1: %.3f' % latency1)
elif stage=='3':
print('Received stage 3 from slave '+slave)
# second reply by slave
t2 = iu.payload['slave_t2']
self.slave_t2s[slave] = float(t2)
t_final = self.get_time()
latency1 = self.latencies[slave]
latency2 = t_final - self.master_t2s[slave]
latency = self.latencies[slave] = (latency1+latency2)/2.0
offset1 = (self.slave_t1s[slave]-self.master_t1)-latency/2.0
offset2 = (self.slave_t2s[slave]-self.master_t2s[slave])-latency/2.0
offset = (offset1+offset2)/2.0
iu.payload.merge({'stage':'4', 'latency': str(latency), 'offset':str(offset)})
if self.timing_handler is None:
print('Determined timing of timesync slave '+slave)
print(' Avg round-trip latency: %.3f s'%latency)
print(' Offset of their clock: %.3f s'%offset)
if event_type == ipaaca.IUEventType.ADDED or event_type == ipaaca.IUEventType.UPDATED:
if self.component_name == master:
# reply to our own initial IU
slave = iu.payload['slave']
stage = iu.payload['stage']
if stage=='1':
# initial reply by slave
t1 = iu.payload['slave_t1']
self.slave_t1s[slave] = float(t1)
t2 = self.master_t2s[slave] = self.get_time()
iu.payload.merge({'master_t2': str(t2), 'stage':'2'})
latency1 = t2 - self.master_t1
#print('Before stage 1 for '+slave+': '+str(self.latencies))
self.latencies[slave] = latency1
#print('After stage 1 for '+slave+': '+str(self.latencies))
#print('Latency of round-trip 1: %.3f' % latency1)
elif stage=='3':
#print('At stage 3 for '+slave+': '+str(self.latencies))
# second reply by slave
t2 = iu.payload['slave_t2']
self.slave_t2s[slave] = float(t2)
t_final = self.get_time()
latency1 = self.latencies[slave]
latency2 = t_final - self.master_t2s[slave]
latency = self.latencies[slave] = (latency1+latency2)/2.0
offset1 = (self.slave_t1s[slave]-self.master_t1)-latency/2.0
offset2 = (self.slave_t2s[slave]-self.master_t2s[slave])-latency/2.0
offset = (offset1+offset2)/2.0
iu.payload.merge({'stage':'4', 'latency': str(latency), 'offset':str(offset)})
if self.timing_handler is None:
print('Determined timing of timesync slave '+slave)
print(' Avg round-trip latency: %.3f s'%latency)
print(' Offset of their clock: %.3f s'%offset)
else:
self.timing_handler(self.component_name, slave, latency, offset)
else:
self.timing_handler(self.component_name, slave, latency, offset)
else:
# other stages are handled by time slave handler
pass
# other stages are handled by time slave handler
pass
def get_time(self):
return time.time() + self.debug_offset
......@@ -129,7 +131,6 @@ class TimesyncSlave(object):
#self.master_t2 = None
#self.master = None
self.latency = None
self.my_iu = None
#
self.debug_offset = debug_offset
#
......@@ -142,30 +143,23 @@ class TimesyncSlave(object):
master = iu.payload['master']
stage = iu.payload['stage']
if self.component_name != master:
if not own:
if not own and stage=='0':
# reply only to IUs from others
if stage=='0':
#print('Received stage 0 from master '+master)
# initial reply to master
self.my_iu = ipaaca.iu.IU('timesyncReply')
# TODO: add grounded_in link too?
t1 = self.get_time()
self.my_iu.payload = iu.payload
self.my_iu.payload['slave'] = self.component_name
self.my_iu.payload['slave_t1'] = str(t1)
self.my_iu.payload['stage'] = '1'
#self.my_iu.payload.merge({
# 'slave':self.component_name,
# 'slave_t1':str(t1),
# 'stage':'1',
# })
self.ob.add(self.my_iu)
else:
#print('Received stage 0 from master '+master)
# initial reply to master
myiu = ipaaca.iu.IU('timesyncReply')
# TODO: add grounded_in link too?
t1 = self.get_time()
myiu.payload = iu.payload
myiu.payload['slave'] = self.component_name
myiu.payload['slave_t1'] = str(t1)
myiu.payload['stage'] = '1'
self.ob.add(myiu)
elif iu.payload['slave'] == self.component_name:
if stage=='2':
#print('Received stage 2 from master '+master)
t2 = self.get_time()
self.my_iu.payload.merge({
iu.payload.merge({
'slave_t2':str(t2),
'stage':'3',
})
......
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -32,7 +32,6 @@
import time
import logging
import ipaaca
iu_to_write = None
......@@ -69,8 +68,8 @@ while True:
else:
iu.payload = {'a': 'reset'}
except ipaaca.IUUpdateFailedError, e:
ipaaca.logger.warning("Payload update failed (IU changed in the mean time)")
except ipaaca.IUUpdateFailedError as e:
print("Payload update failed (IU changed in the mean time)")
time.sleep(0.1)
exit(0)
......@@ -2,5 +2,5 @@
import ipaaca
print "{this is the IpaacaPython run.py doing nothing at all}"
print("{this is the IpaacaPython run.py doing nothing at all}")
......@@ -2,7 +2,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......
......@@ -2,7 +2,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......
......@@ -2,7 +2,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......
......@@ -2,7 +2,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......
......@@ -2,7 +2,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......
......@@ -36,30 +36,174 @@ from __future__ import division, print_function
import sys
import ipaaca
import traceback
import io
def pretty_printed_time(t):
if t<0:
t = -t
sign = '-'
else:
sign = ' '
s = float(t)
h = int(s/3600)
m = int(s/60) - h*60
s -= h*3600 + m*60
ms = int((s-int(s))*1000)
return sign+'%01d:%02d:%02d.%03d'%(h, m, int(s), ms)
REPLACEMENT_COLORS={
'30': 'black',
'31': 'red',
'32': 'green',
'33': 'cyan',
'34': 'blue',
'35': 'brown',
'36': 'magenta',
'37': 'grey',
'248': 'lightgray',
# should add more colors
}
def replace_ansi_html(s):
r = u''
in_esc = False
last_color = u''
#skip_one_double_wide_bar = False
for c in s:
if in_esc:
if c=='m':
in_esc = False
itms = last_color.replace('[','').replace(']','').split(';')
col = None
bold = False
if itms[0]=='':
r += '</code></font><code>'
else:
for i in itms:
if i=='1':
bold = True
elif i in REPLACEMENT_COLORS:
col = REPLACEMENT_COLORS[i]
if col is None:
if bold:
col = 'grey'
else:
col = 'black'
print('Warning: unknown colors '+str(itms))
r += '</code><font color="'+col+'"><code>'
else:
last_color += c
else:
#if c in u'▁▂▃▄▅▆▇█':
# if skip_one_double_wide_bar:
# skip_one_double_wide_bar = False
# else:
# r += c
# skip_one_double_wide_bar = True
#el
if c=='':
in_esc = True
last_color = ''
else:
r += c
return r
def print_header(formt, fields):
s = u''
if formt=='html':
s += '<html><head><meta charset="UTF-8"><title>flexdiam log view</title></head>\n<body>\n<table width="100%" border="1" bordercolor="lightgray" style="white-space:pre">\n<tr>'
for f in fields:
s+='<th>'+f+'</th>'
s += '</tr>'
return s
def print_footer(formt):
s = u''
if formt=='html':
s += '</tr>\n</body>\n</html>'
return s
def print_record(data, delimiter, formt):
if formt=='html':
s = u'<tr>'
for d in data:
d2 = d.replace('<', '&lt;').replace('>', '&gt;').replace('\n', '<br/>').replace('"', '&quot;')
d3 = replace_ansi_html(d2)
#s += u'<td><code>' + d2.replace('<', '&lt;').replace('>', '&gt;').replace('\n', '<br/>') + u'</code></td>'
s += u'<td><code>' + d3 + u'</code></td>'
s += u'</tr>'
return s
else:
return delimiter.join(data)
def modify(key, value, strip=False, pretty_printed_times=False, time_offset=0.0):
if key=='time':
f = float(value.strip()) - time_offset
return pretty_printed_time(f) if pretty_printed_times else str(f)
else:
return value.strip() if strip else value
if __name__=='__main__':
iap = ipaaca.IpaacaArgumentParser(
'ipaaca-logcat')
iap.add_argument(
'-s', '--strip-fields',
dest='strip', action='store_true',
default=False,
help='strip leading/trailing whitespace from all fields')
iap.add_argument(
'-d', '--delimiter',
dest='delimiter', nargs=1,
default=['\t'],
help='field delimiter, interpreted as python unicode string (default \'\\t\')')
iap.add_argument(
'-f', '--fields',
dest='fields', default=['time', 'text'], nargs='+',
help='fields to print (defaults: \'time\' \'text\')')
arguments = iap.parse_args()
delimiter = eval("u'"+arguments.delimiter[0]+"'", {"__builtins__":None}, {} )
#print(arguments); sys.exit(1)
modify = (lambda s: s.strip()) if arguments.strip else (lambda s: s)
#modify = lambda s: type(s).__name__
for line in sys.stdin:
record = eval(line.strip(), {"__builtins__":None}, {} )
print(delimiter.join([modify(unicode(record[f])) for f in arguments.fields]))
try:
iap = ipaaca.IpaacaArgumentParser('ipaaca-logcat')
iap.add_argument(
'-s', '--strip-fields',
dest='strip', action='store_true',
default=False,
help='Strip leading/trailing whitespace from all fields')
iap.add_argument(
'-p', '--pretty-printed-times',
dest='pretty_printed_times', action='store_true',
default=False,
help='Print human-readable times (hh:mm:ss.ms) instead of float seconds')
iap.add_argument(
'--format',
dest='format', nargs=1,
default=['plain'],
help='output format (plain, html) (default plain)')
iap.add_argument(
'-o', '--output-file',
dest='output_file', nargs=1,
default=['-'],
help='output file name (default \'-\' -> standard terminal output)')
iap.add_argument(
'-d', '--delimiter',
dest='delimiter', nargs=1,
default=['\t'],
help='field delimiter, interpreted as python unicode string (default \'\\t\')')
iap.add_argument(
'-t', '--align-times',
dest='align_times', nargs=2,
default=['0', '0'],
help='Calculate relative output timestamps (default: 0 0 => keep)\nFirst argument is a reference event timestamp from the log file\nSecond argument is the new output time for that same event')
iap.add_argument(
'-f', '--fields',
dest='fields', default=['time', 'text'], nargs='+',
help='fields to print (defaults: \'time\' \'text\')')
arguments = iap.parse_args()
delimiter = eval("u'"+arguments.delimiter[0]+"'", {"__builtins__":None}, {} )
ref_t, out_t = float(arguments.align_times[0]), float(arguments.align_times[1])
time_offset = ref_t - out_t
#print(arguments); sys.exit(1)
#modify = (lambda s: s.strip()) if arguments.strip else (lambda s: s)
#modify = lambda s: type(s).__name__
fil = sys.stdout
if arguments.output_file[0] != '-':
fil = io.open(arguments.output_file[0], 'w', encoding='utf8')
fil.write(print_header(arguments.format[0], arguments.fields)+'\n')
for line in sys.stdin:
record = eval(line.strip(), {"__builtins__":None}, {} )
data = [modify(f, unicode(record[f]), arguments.strip, arguments.pretty_printed_times, time_offset) for f in arguments.fields]
u = print_record(data, delimiter, arguments.format[0])
res = u'{}'.format(u) #.decode('utf-8')
fil.write(u''+res+'\n' )
fil.write(print_footer(arguments.format[0])+'\n')
if fil != sys.stdout: fil.close()
except (KeyboardInterrupt, SystemExit):
pass # ret below
except Exception, e:
print(u'Exception: '+unicode(traceback.format_exc()))
ipaaca.exit(1)
ipaaca.exit(0)
......@@ -31,7 +31,7 @@
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
#from __future__ import division, print_function
import itertools
import os
......@@ -44,7 +44,7 @@ import ipaaca
def iu_update_handler(iu, event_type, local):
try:
print(event_type + ': ' + unicode(iu))
print(event_type + ': ' + str(iu))
except:
print(u"An error occurred printing an IU for an event of type "+event_type)
......@@ -86,14 +86,16 @@ parser.add_argument(
if __name__ == '__main__':
arguments = parser.parse_args()
print('BackEnd is '+str(ipaaca.backend.get_default_backend().name))
ob = ipaaca.OutputBuffer('IpaacaIUInjector')
ob.register_handler(iu_update_handler)
iu = ipaaca.Message(arguments.category) if arguments.iu_type == 'Message' else ipaaca.IU(arguments.category)
if arguments.json_payload:
# Treat payload values as Python expressions
iu.payload = {k: eval(v) for (k, v) in itertools.izip_longest(arguments.payload[::2], arguments.payload[1::2])}
iu.payload = {k: eval(v) for (k, v) in itertools.zip_longest(arguments.payload[::2], arguments.payload[1::2])}
else:
iu.payload = {k: v for (k, v) in itertools.izip_longest(arguments.payload[::2], arguments.payload[1::2])}
iu.payload = {k: v for (k, v) in itertools.zip_longest(arguments.payload[::2], arguments.payload[1::2])}
ob.add(iu)
print(
......@@ -112,8 +114,7 @@ if __name__ == '__main__':
time.sleep(0.1)
except KeyboardInterrupt:
pass
if platform.system() == 'Windows':
os._exit(0)
else:
sys.exit(0)
except Exception as e:
print(u'Exception: '+str(traceback.format_exc()))
ipaaca.exit(1)
ipaaca.exit(0)
......@@ -31,7 +31,7 @@
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
#from __future__ import division, print_function
import logging
import os
......@@ -51,7 +51,7 @@ def event_type_color(typ):
'MESSAGE': '34;1',
'COMMITTED': '35;1',
'LINKSUPDATED': '36;1',
'RETRACTED': '37;1',
'RETRACTED': '37',
}
return colors.get(typ, '1')
......@@ -61,13 +61,13 @@ def highlight_if_color(s, c='1'):
def pretty_printed_dict(d):
s='{\n'
for k, v in d.items():
if isinstance(v, unicode) or isinstance(v, str):
v = "'"+unicode(v)+"'"
if isinstance(v, str) or isinstance(v, str):
v = "'"+str(v)+"'"
else:
v = unicode(v)
v = str(v)
v2 = v if len(v) <= arguments.max_size else v[:arguments.max_size] + '<excess length omitted>'
v2.replace('\\','\\\\').replace('\n', highlight_if_color('\\n'))
s += "\t '%s': %s,\n" % (highlight_if_color(unicode(k),'1'), unicode(v2))
s += "\t '%s': %s,\n" % (highlight_if_color(str(k),'1'), str(v2))
s+='}'
return s
......@@ -97,10 +97,11 @@ def iu_event_handler(iu, event_type, local):
print(pretty_printed_iu_event(iu, event_type, local), end='\n\n')
def exit(code):
if platform.system() == 'Windows':
os._exit(code)
else:
sys.exit(code)
ipaaca.exit(code)
#if platform.system() == 'Windows':
# os._exit(code)
#else:
# sys.exit(code)
parser = ipaaca.IpaacaArgumentParser(description='Ipaaca IU Sniffer -- Selectively listen to IPAACA traffic')
parser.add_argument(
......@@ -147,11 +148,24 @@ if __name__ == '__main__':
arguments = parser.parse_args()
buffers = {}
backend_name = str(ipaaca.backend.get_default_backend().name)
print('BackEnd is '+backend_name)
universal_listener_category = ''
if backend_name == 'mqtt':
universal_listener_category = '#'
if arguments.categories == ['']: arguments.categories = ['#']
elif backend_name == 'ros':
if arguments.categories == [''] or arguments.regex:
print('ATTENTION: listening to all categories not implemented for ROS back-end!')
print(' (By extension, the same goes for filtering all categories by regex.)')
print(' !! You will receive nothing, please provide fixed category names. !!')
# Create one input buffer for each channel we are listening on
for channel in arguments.channels:
buffers[channel] = ipaaca.InputBuffer(
'IpaacaIUSniffer',
category_interests=arguments.categories if not arguments.regex else [''],
category_interests=arguments.categories if not arguments.regex else [universal_listener_category],
channel=channel,
resend_active=True)
# Check whether the specified event_types are valid
......@@ -173,7 +187,7 @@ if __name__ == '__main__':
else:
print(', '.join([highlight_if_color(et.upper(), event_type_color(et.upper())) for et in arguments.event_types]))
print(' * for category/ies', end='')
if arguments.categories == ['']:
if arguments.categories == [universal_listener_category]:
print(': any')
else:
if arguments.regex:
......@@ -186,4 +200,7 @@ if __name__ == '__main__':
time.sleep(1)
except KeyboardInterrupt:
pass
exit(code=0)
except Exception as e:
print(u'Exception: '+str(traceback.format_exc()))
ipaaca.exit(1)
ipaaca.exit(0)