Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • scs/ipaaca
  • ramin.yaghoubzadeh/ipaaca
2 results
Show changes
Showing
with 1507 additions and 442 deletions
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -35,24 +35,49 @@ from __future__ import division, print_function
import threading
import uuid
import traceback
import rsb
import six
import weakref
import atexit
import ipaaca_pb2
#import rsb
import ipaaca.ipaaca_pb2
import ipaaca.defaults
import ipaaca.exception
import ipaaca.converter
import ipaaca.iu
import ipaaca.backend
__all__ = [
'InputBuffer',
'OutputBuffer',
]
LOGGER = ipaaca.misc.get_library_logger()
# set of objects to auto-clean on exit, assumes _teardown() method
TEARDOWN_OBJECTS = set()
def atexit_cleanup_function():
'''Function to call at program exit to auto-clean objects.'''
global TEARDOWN_OBJECTS
for obj_r in TEARDOWN_OBJECTS:
obj = obj_r()
if obj is not None: # if weakref still valid
obj._teardown()
ipaaca.backend.get_default_backend().teardown()
atexit.register(atexit_cleanup_function)
def auto_teardown_instances(fn):
'''Decorator function for object constructors, to add
new instances to the object set to auto-clean at exit.'''
def auto_teardown_instances_wrapper(instance, *args, **kwargs):
global TEARDOWN_OBJECTS
fn(instance, *args, **kwargs)
TEARDOWN_OBJECTS.add(weakref.ref(instance))
return auto_teardown_instances_wrapper
class IUStore(dict):
"""A dictionary storing IUs."""
def __init__(self):
......@@ -89,10 +114,10 @@ class IUEventHandler(object):
self._handler_function = handler_function
self._for_event_types = (
None if for_event_types is None else
(for_event_types[:] if hasattr(for_event_types, '__iter__') else [for_event_types]))
(for_event_types[:] if not isinstance(for_event_types, six.string_types) and hasattr(for_event_types, '__iter__') else [for_event_types]))
self._for_categories = (
None if for_categories is None else
(for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories]))
(for_categories[:] if not isinstance(for_categories, six.string_types) and hasattr(for_categories, '__iter__') else [for_categories]))
def condition_met(self, event_type, category):
"""Check whether this IUEventHandler should be called.
......@@ -132,9 +157,10 @@ class Buffer(object):
participant_config -- RSB configuration
'''
super(Buffer, self).__init__()
ipaaca.initialize_ipaaca_rsb_if_needed()
self._owning_component_name = owning_component_name
self._channel = channel if channel is not None else ipaaca.defaults.IPAACA_DEFAULT_CHANNEL
self._participant_config = rsb.ParticipantConfig.fromDefaultSources() if participant_config is None else participant_config
self._participant_config = participant_config
self._uuid = str(uuid.uuid4())[0:8]
# Initialise with a temporary, but already unique, name
self._unique_name = "undef-"+self._uuid
......@@ -145,6 +171,12 @@ class Buffer(object):
return FrozenIUStore(original_iu_store = self._iu_store)
iu_store = property(fget=_get_frozen_iu_store, doc='Copy-on-read version of the internal IU store')
def _get_channel(self):
return self._channel
channel = property(
fget=_get_channel,
doc='The IPAACA channel the buffer is connected to.')
def register_handler(self, handler_function, for_event_types=None, for_categories=None):
"""Register a new IU event handler function.
......@@ -168,8 +200,9 @@ class Buffer(object):
h.call(self, uid, local=local, event_type=event_type, category=category)
except Exception as e:
if local:
LOGGER.error('Local IU handler raised an exception upon remote write.' + unicode(e))
LOGGER.error('Local IU handler raised an exception upon remote write.' + str(e))
else:
print(str(traceback.format_exc()))
raise e
def _get_owning_component_name(self):
......@@ -187,6 +220,7 @@ class InputBuffer(Buffer):
"""An InputBuffer that holds remote IUs."""
@auto_teardown_instances
def __init__(self, owning_component_name, category_interests=None, channel=None, participant_config=None, resend_active=False):
'''Create an InputBuffer.
......@@ -205,26 +239,40 @@ class InputBuffer(Buffer):
self._add_category_listener(str(self._uuid))
if category_interests is not None:
self.add_category_interests(category_interests)
def _get_remote_server(self, iu):
def _get_remote_server(self, event_or_iu):
'''Return (or create, store and return) a remote server.'''
_owner = None
if hasattr(iu,'owner_name'):
_owner = iu.owner_name
elif hasattr(iu,'writer_name'):
_owner = iu.writer_name
if _owner is not None:
if _owner in self._remote_server_store:
return self._remote_server_store[_owner]
# TODO remove the str() when unicode is supported (issue #490)
remote_server = rsb.createRemoteServer(rsb.Scope(str(_owner)))
self._remote_server_store[_owner] = remote_server
return remote_server
_remote_server_name = self._get_owner(event_or_iu) + '/Server'
if _remote_server_name:
try:
return self._remote_server_store[_remote_server_name]
except KeyError:
be = ipaaca.backend.get_default_backend()
remote_server = be.createRemoteServer(be.Scope(str(_remote_server_name)), config=self._participant_config)
self._remote_server_store[_remote_server_name] = remote_server
return remote_server
else:
None
def _get_owner(self, event_or_iu):
if hasattr(event_or_iu, 'data'):
# is RSB event
data = event_or_iu.data
if hasattr(data, 'owner_name'):
return data.owner_name
elif hasattr(data, 'writer_name'):
return data.writer_name
else:
return None
else:
# is IU
return event_or_iu.owner_name
def _add_category_listener(self, iu_category):
'''Create and store a listener on a specific category.'''
if iu_category not in self._listener_store:
cat_listener = rsb.createListener(rsb.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)), config=self._participant_config)
be = ipaaca.backend.get_default_backend()
cat_listener = be.createListener(be.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)), config=self._participant_config)
cat_listener.addHandler(self._handle_iu_events)
self._listener_store[iu_category] = cat_listener
self._category_interests.append(iu_category)
......@@ -237,6 +285,24 @@ class InputBuffer(Buffer):
self._category_interests.remove(iu_category)
LOGGER.info("Removed listener in scope /ipaaca/channel/" + str(self._channel) + "/category/ " + iu_category)
def _teardown(self):
'''OutputBuffer retracts remaining live IUs on teardown'''
self._deactivate_all_internal()
def __del__(self):
'''Perform teardown as soon as Buffer is lost.'''
self._deactivate_all_internal()
def _deactivate_all_internal(self):
'''Deactivate all participants.'''
for listener in self._listener_store.values():
try:
listener.deactivate()
except RuntimeError:
# Is raised if an already deactivated participant is
# deactivated again
pass
def _handle_iu_events(self, event):
'''Dispatch incoming IU events.
......@@ -247,7 +313,7 @@ class InputBuffer(Buffer):
event -- a converted RSB event
'''
type_ = type(event.data)
if type_ is ipaaca.iu.RemotePushIU:
if type_ == ipaaca.iu.RemotePushIU:
# a new IU
if event.data.uid not in self._iu_store:
self._iu_store[event.data.uid] = event.data
......@@ -260,7 +326,7 @@ class InputBuffer(Buffer):
# done via the resend request mechanism).
self._iu_store[event.data.uid] = event.data
event.data.buffer = self
elif type_ is ipaaca.iu.RemoteMessage:
elif type_ == ipaaca.iu.RemoteMessage:
# a new Message, an ephemeral IU that is removed after calling handlers
self._iu_store[ event.data.uid ] = event.data
event.data.buffer = self
......@@ -268,40 +334,41 @@ class InputBuffer(Buffer):
del self._iu_store[ event.data.uid ]
else:
if event.data.uid not in self._iu_store:
if self._resend_active:
# send resend request to remote server
self._request_remote_resend(event.data)
if (self._resend_active and
not type_ == ipaaca.ipaaca_pb2.IURetraction):
# send resend request to remote server, IURetraction is ignored
try:
self._request_remote_resend(event)
except ipaaca.exception.IUResendRequestFailedError:
LOGGER.warning('Requesting resend for IU {} failed.'.
format(event.data.uid))
else:
LOGGER.warning("Received an update for an IU which we did not receive before.")
return
# an update to an existing IU
if type_ is ipaaca_pb2.IURetraction:
if type_ == ipaaca.ipaaca_pb2.IURetraction:
# IU retraction (cannot be triggered remotely)
iu = self._iu_store[event.data.uid]
iu._revision = event.data.revision
iu._apply_retraction() # for now - just sets the _rectracted flag.
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.RETRACTED, category=iu.category)
# SPECIAL CASE: allow the handlers (which will need to find the IU
# in the buffer) to operate on the IU - then delete it afterwards!
# FIXME: for now: retracted == deleted! Think about this later
del(self._iu_store[iu.uid])
else:
if event.data.writer_name == self.unique_name:
# Notify only for remotely triggered events;
# Discard updates that originate from this buffer
return
if type_ is ipaaca_pb2.IUCommission:
if type_ == ipaaca.ipaaca_pb2.IUCommission:
# IU commit
iu = self._iu_store[event.data.uid]
iu._apply_commission()
iu._revision = event.data.revision
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.COMMITTED, category=iu.category)
elif type_ is ipaaca.converter.IUPayloadUpdate:
elif type_ == ipaaca.converter.IUPayloadUpdate:
# IU payload update
iu = self._iu_store[event.data.uid]
iu._apply_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.UPDATED, category=iu.category)
elif type_ is ipaaca.converter.IULinkUpdate:
elif type_ == ipaaca.converter.IULinkUpdate:
# IU link update
iu = self._iu_store[event.data.uid]
iu._apply_link_update(event.data)
......@@ -310,27 +377,31 @@ class InputBuffer(Buffer):
LOGGER.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_))
def add_category_interests(self, category_interests):
if hasattr(category_interests, '__iter__'):
if not isinstance(category_interests, six.string_types) and hasattr(category_interests, '__iter__'):
for interest in category_interests:
self._add_category_listener(interest)
else:
self._add_category_listener(category_interests)
def remove_category_interests(self, category_interests):
if hasattr(category_interests, '__iter__'):
if not isinstance(category_interests, six.string_types) and hasattr(category_interests, '__iter__'):
for interest in category_interests:
self._remove_category_listener(interest)
else:
self._remove_category_listener(category_interests)
def _request_remote_resend(self, iu):
remote_server = self._get_remote_server(iu)
resend_request = ipaaca_pb2.IUResendRequest()
resend_request.uid = iu.uid # target iu
resend_request.hidden_scope_name = str(self._uuid) # hidden category name
remote_revision = remote_server.requestResend(resend_request)
if remote_revision == 0:
raise ipaaca.exception.IUResendRequestFailedError()
def _request_remote_resend(self, event):
remote_server = self._get_remote_server(event)
if remote_server:
resend_request = ipaaca.ipaaca_pb2.IUResendRequest()
resend_request.uid = event.data.uid # target iu
resend_request.hidden_scope_name = str(self._uuid) # hidden category name
remote_revision = remote_server.resendRequest(resend_request)
if remote_revision == 0:
raise ipaaca.exception.IUResendRequestFailedError(event.data.uid)
else:
# Remote server is not known
raise ipaaca.exception.IUResendRequestRemoteServerUnknownError(event.data.uid)
def register_handler(self, handler_function, for_event_types=None, for_categories=None):
"""Register a new IU event handler function.
......@@ -362,8 +433,9 @@ class OutputBuffer(Buffer):
"""An OutputBuffer that holds local IUs."""
@auto_teardown_instances
def __init__(self, owning_component_name, channel=None, participant_config=None):
'''Create an Output Buffer.
'''Create an OutputBuffer.
Keyword arguments:
owning_component_name -- name of the entity that own this buffer
......@@ -371,15 +443,25 @@ class OutputBuffer(Buffer):
'''
super(OutputBuffer, self).__init__(owning_component_name, channel, participant_config)
self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB'
self._server = rsb.createLocalServer(rsb.Scope(self._unique_name))
self._server.addMethod('updateLinks', self._remote_update_links, ipaaca.converter.IULinkUpdate, int)
self._server.addMethod('updatePayload', self._remote_update_payload, ipaaca.converter.IUPayloadUpdate, int)
self._server.addMethod('commit', self._remote_commit, ipaaca_pb2.IUCommission, int)
self._server.addMethod('requestResend', self._remote_request_resend, ipaaca_pb2.IUResendRequest, int)
be = ipaaca.backend.get_default_backend()
self._server = be.createLocalServer(self, be.Scope(self._unique_name + '/Server'), config=self._participant_config)
self._informer_store = {}
self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-'
self.__iu_id_counter_lock = threading.Lock()
def _teardown(self):
'''OutputBuffer retracts remaining live IUs on teardown'''
self._retract_all_internal()
self._deactivate_all_internal()
def __del__(self):
'''Perform teardown (IU retractions) as soon as Buffer is lost.
Note that at program exit the teardown might be called
twice for live objects (atexit, then del), but the
_retract_all_internal method prevents double retractions.'''
self._retract_all_internal()
self._deactivate_all_internal()
def _remote_update_links(self, update):
'''Apply a remotely requested update to one of the stored IU's links.'''
if update.uid not in self._iu_store:
......@@ -407,10 +489,10 @@ class OutputBuffer(Buffer):
with iu.revision_lock:
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
LOGGER.warning(u"Remote update_payload operation failed because request was out of date; IU "+str(update.uid))
LOGGER.warning(u" Writer was: "+update.writer_name)
LOGGER.warning(u" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove))
LOGGER.warning(u" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision))
LOGGER.warning("Remote update_payload operation failed because request was out of date; IU "+str(update.uid))
LOGGER.warning(" Writer was: "+update.writer_name)
LOGGER.warning(" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove))
LOGGER.warning(" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision))
return 0
if update.is_delta:
#print('Writing delta update by '+str(update.writer_name))
......@@ -433,7 +515,7 @@ class OutputBuffer(Buffer):
return 0
iu = self._iu_store[iu_resend_request_pack.uid]
with iu.revision_lock:
if iu_resend_request_pack.hidden_scope_name is not None and iu_resend_request_pack.hidden_scope_name is not '':
if iu_resend_request_pack.hidden_scope_name is not None and iu_resend_request_pack.hidden_scope_name != '':
informer = self._get_informer(iu_resend_request_pack.hidden_scope_name)
informer.publishData(iu)
return iu.revision
......@@ -463,8 +545,9 @@ class OutputBuffer(Buffer):
if iu_category in self._informer_store:
LOGGER.info("Returning informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category))
return self._informer_store[iu_category]
informer_iu = rsb.createInformer(
rsb.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)),
be = ipaaca.backend.get_default_backend()
informer_iu = be.createInformer(
be.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)),
config=self._participant_config,
dataType=object)
self._informer_store[iu_category] = informer_iu #new_tuple
......@@ -477,6 +560,8 @@ class OutputBuffer(Buffer):
raise ipaaca.exception.IUPublishedError(iu)
if iu.buffer is not None:
raise ipaaca.exception.IUPublishedError(iu)
if iu.retracted:
raise ipaaca.exception.IURetractedError(iu)
if iu.access_mode != ipaaca.iu.IUAccessMode.MESSAGE:
# Messages are not really stored in the OutputBuffer
self._iu_store[iu.uid] = iu
......@@ -484,12 +569,12 @@ class OutputBuffer(Buffer):
self._publish_iu(iu)
def remove(self, iu=None, iu_uid=None):
'''Remove the iu or an IU corresponding to iu_uid from the OutputBuffer, retracting it from the system.'''
'''Retracts an IU and removes it from the OutputBuffer.'''
if iu is None:
if iu_uid is None:
return None
else:
if iu_uid not in self. _iu_store:
if iu_uid not in self._iu_store:
raise ipaaca.exception.IUNotFoundError(iu_uid)
iu = self._iu_store[iu_uid]
# unpublish the IU
......@@ -503,12 +588,35 @@ class OutputBuffer(Buffer):
informer.publishData(iu)
def _retract_iu(self, iu):
'''Retract (unpublish) an IU.'''
iu_retraction = ipaaca_pb2.IURetraction()
'''Retract an IU.'''
iu._retracted = True
iu_retraction = ipaaca.ipaaca_pb2.IURetraction()
iu_retraction.uid = iu.uid
iu_retraction.revision = iu.revision
informer = self._get_informer(iu._category)
informer.publishData(iu_retraction)
def _retract_all_internal(self):
'''Retract all IUs without removal (for Buffer teardown).'''
for iu in self._iu_store.values():
if not iu._retracted:
self._retract_iu(iu)
def _deactivate_all_internal(self):
'''Deactivate all participants.'''
try:
self._server.deactivate()
except RuntimeError:
# Is raised if an already deactivated participant is
# deactivated again
pass
for informer in self._informer_store.values():
try:
informer.deactivate()
except RuntimeError:
# Is raised if an already deactivated participant is
# deactivated again
pass
def _send_iu_commission(self, iu, writer_name):
'''Send IU commission.
......@@ -521,7 +629,7 @@ class OutputBuffer(Buffer):
'''
# a raw Protobuf object for IUCommission is produced
# (unlike updates, where we have an intermediate class)
iu_commission = ipaaca_pb2.IUCommission()
iu_commission = ipaaca.ipaaca_pb2.IUCommission()
iu_commission.uid = iu.uid
iu_commission.revision = iu.revision
iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name
......
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import os
import re
try:
import configparser
except:
import ConfigParser as configparser
LOGGER = ipaaca.misc.get_library_logger()
__global_config = None
class Config(object):
def __init__(self):
self._store = {}
def get_with_default(self, key, default_value, warn=False):
if key in self._store:
return self._store[key]
else:
notif = LOGGER.warning if warn else LOGGER.debug
notif('Config key '+str(key)+' not found, returning default of '+str(default_value))
return default_value
def populate_from_global_sources(self):
self._store = {}
self.populate_from_any_conf_files()
self.populate_from_environment()
#self.populate_from_argv_overrides() # TODO IMPLEMENT_ME
def populate_from_any_conf_files(self):
globalconf = os.getenv('HOME', '')+'/.config/ipaaca.conf'
for filename in ['ipaaca.conf', globalconf]:
try:
f = open(filename, 'r')
c = configparser.ConfigParser()
c.readfp(f)
f.close()
LOGGER.info('Including configuration from '+filename)
for k,v in c.items('ipaaca'):
self._store[k] = v
return
except:
pass
LOGGER.info('Could not load ipaaca.conf either here or in ~/.config')
def populate_from_environment(self):
for k, v in os.environ.items():
if k.startswith('IPAACA_'):
if re.match(r'^[A-Za-z0-9_]*$', k) is None:
LOGGER.warning('Ignoring malformed environment key')
else:
if len(v)>1023:
LOGGER.warning('Ignoring long environment value')
else:
# remove initial IPAACA_ and transform key to dotted lowercase
trans_key = k[7:].lower().replace('_', '.')
self._store[trans_key] = v
LOGGER.debug('Configured from environment: '+str(trans_key)+'="'+str(v)+'"')
def get_global_config():
global __global_config
if __global_config is None:
__global_config = Config()
__global_config.populate_from_global_sources()
return __global_config
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -34,9 +34,9 @@ from __future__ import division, print_function
import collections
import rsb.converter
#import rsb.converter
import ipaaca_pb2
import ipaaca.ipaaca_pb2
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
......@@ -45,230 +45,364 @@ import ipaaca.misc
LOGGER = ipaaca.misc.get_library_logger()
try:
import simplejson as json
import simplejson as json
except ImportError:
import json
LOGGER.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.')
import json
LOGGER.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.')
__all__ = [
'IntConverter',
'IUConverter',
'IULinkUpdate',
'IULinkUpdateConverter',
'IUPayloadUpdate',
'IUPayloadUpdateConverter',
'MessageConverter',
]
class IntConverter(rsb.converter.Converter):
"""Convert Python int objects to Protobuf ints and vice versa."""
def __init__(self, wireSchema="int", dataType=int):
super(IntConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, value):
pbo = ipaaca_pb2.IntMessage()
pbo.value = value
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca_pb2.IntMessage()
pbo.ParseFromString( str(byte_stream) )
return pbo.value
def pack_payload_entry(entry, key, value, _type=ipaaca.iu.IUPayloadType.JSON):
entry.key = key
if _type == ipaaca.iu.IUPayloadType.JSON:
entry.value = json.dumps(value)
elif _type == ipaaca.iu.IUPayloadType.STR or _type == 'MAP':
entry.value = str(value)
else:
raise ipaaca.exception.IpaacaException('Asked to send payload entry with unsupported type "' + _type + '".')
entry.type = _type
'IntConverter',
'IUConverter',
'IULinkUpdate',
'IULinkUpdateConverter',
'IUPayloadUpdate',
'IUPayloadUpdateConverter',
'MessageConverter',
'register_global_converter',
]
_LOW_LEVEL_WIRE_SCHEMA_MAP = None
def LOW_LEVEL_WIRE_SCHEMA_FOR(abstractname):
'''Map the abstract wire schema name (was used in RSB) to a
transport-dependent magic to detect on the wire.
Here: a required protobuf field'''
global _LOW_LEVEL_WIRE_SCHEMA_MAP
if _LOW_LEVEL_WIRE_SCHEMA_MAP is None:
_LOW_LEVEL_WIRE_SCHEMA_MAP = {
int: ipaaca.ipaaca_pb2.WireTypeIntMessage,
ipaaca.iu.IU: ipaaca.ipaaca_pb2.WireTypeIU,
ipaaca.iu.Message: ipaaca.ipaaca_pb2.WireTypeMessageIU,
IUPayloadUpdate: ipaaca.ipaaca_pb2.WireTypeIUPayloadUpdate,
IULinkUpdate: ipaaca.ipaaca_pb2.WireTypeIULinkUpdate,
'int': ipaaca.ipaaca_pb2.WireTypeIntMessage,
'ipaaca-iu': ipaaca.ipaaca_pb2.WireTypeIU,
'ipaaca-messageiu': ipaaca.ipaaca_pb2.WireTypeMessageIU,
'ipaaca-iu-payload-update': ipaaca.ipaaca_pb2.WireTypeIUPayloadUpdate,
'ipaaca-iu-link-update': ipaaca.ipaaca_pb2.WireTypeIULinkUpdate,
}
return _LOW_LEVEL_WIRE_SCHEMA_MAP.get(abstractname)
def __fail_no_type_converter():
raise ipaaca.exception.BackendSerializationError()
class FailingDict(dict):
def __init__(self, error_class, *args, **kwargs):
super(FailingDict, self).__init__(*args, **kwargs)
self._error_class = error_class
def __getitem__(self, k):
if k in self:
return dict.__getitem__(self, k)
else:
raise self._error_class(k)
# global converter / [un]marshaller store
__converter_registry_by_type = FailingDict(ipaaca.exception.BackendSerializationError)
__converter_registry_by_wire_schema = FailingDict(ipaaca.exception.BackendDeserializationError)
def register_global_converter(converter):
global __converter_registry_by_type, __converter_registry_by_wire_schema
real_wire_schema = LOW_LEVEL_WIRE_SCHEMA_FOR(converter._wire_schema)
if real_wire_schema is None:
raise NotImplementedError('There is no entry in the _LOW_LEVEL_WIRE_SCHEMA_MAP for '+str(converter._wire_schema))
if real_wire_schema in __converter_registry_by_wire_schema:
raise ipaaca.exception.ConverterRegistrationError(real_wire_schema)
if converter._data_type in __converter_registry_by_type:
raise ipaaca.exception.ConverterRegistrationError(converter._data_type.__name__)
__converter_registry_by_type[converter._data_type] = converter
__converter_registry_by_wire_schema[real_wire_schema] = converter
def deserialize(lowlevel_message):
pbo_outer = ipaaca.ipaaca_pb2.TransportLevelWrapper()
pbo_outer.ParseFromString(lowlevel_message)
type_ = pbo_outer.transport_message_type
#print('Received wire message type', type_)
if type_ in __converter_registry_by_wire_schema:
return __converter_registry_by_wire_schema[type_].deserialize(pbo_outer.raw_message, None)
else:
pbo = None
if type_ == ipaaca.ipaaca_pb2.WireTypeRemoteRequestResult:
pbo = ipaaca.ipaaca_pb2.RemoteRequestResult()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIURetraction:
pbo = ipaaca.ipaaca_pb2.IURetraction()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIUCommission:
pbo = ipaaca.ipaaca_pb2.IUCommission()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIUResendRequest:
pbo = ipaaca.ipaaca_pb2.IUResendRequest()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIUPayloadUpdateRequest:
pbo = ipaaca.ipaaca_pb2.IUPayloadUpdateRequest()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIUCommissionRequest:
pbo = ipaaca.ipaaca_pb2.IUCommissionRequest()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIULinkUpdateRequest:
pbo = ipaaca.ipaaca_pb2.IULinkUpdateRequest()
if pbo is None:
raise ipaaca.exception.BackendDeserializationError(type_)
else:
pbo.ParseFromString(pbo_outer.raw_message)
return pbo
raise ipaaca.exception.BackendDeserializationError(type_)
def serialize(obj):
inner, type_ = None, None
if obj.__class__ in __converter_registry_by_type:
cls_ = obj.__class__
inner, wire = __converter_registry_by_type[obj.__class__].serialize(obj)
type_ = LOW_LEVEL_WIRE_SCHEMA_FOR(wire)
else:
cls_ = obj.__class__
if cls_ == ipaaca.ipaaca_pb2.RemoteRequestResult:
type_ = ipaaca.ipaaca_pb2.WireTypeRemoteRequestResult
elif cls_ == ipaaca.ipaaca_pb2.IURetraction:
type_ = ipaaca.ipaaca_pb2.WireTypeIURetraction
elif cls_ == ipaaca.ipaaca_pb2.IUCommission:
type_ = ipaaca.ipaaca_pb2.WireTypeIUCommission
elif cls_ == ipaaca.ipaaca_pb2.IUResendRequest:
type_ = ipaaca.ipaaca_pb2.WireTypeIUResendRequest
elif cls_ == ipaaca.ipaaca_pb2.IUPayloadUpdateRequest:
type_ = ipaaca.ipaaca_pb2.WireTypeIUPayloadUpdateRequest
elif cls_ == ipaaca.ipaaca_pb2.IUCommissionRequest:
type_ = ipaaca.ipaaca_pb2.WireTypeIUCommissionRequest
elif cls_ == ipaaca.ipaaca_pb2.IULinkUpdateRequest:
type_ = ipaaca.ipaaca_pb2.WireTypeIULinkUpdateRequest
if type_ is None:
raise ipaaca.exception.BackendSerializationError(cls_)
else:
inner = obj.SerializeToString()
pbo = ipaaca.ipaaca_pb2.TransportLevelWrapper()
pbo.transport_message_type = type_
pbo.raw_message = inner
return bytearray(pbo.SerializeToString())
class ConverterBase(object):
'''Base for converters (to serialize and unserialize
data automatically depending on its Python type).'''
def __init__(self, substrate, data_type, wire_schema):
self._substrate = substrate
self._wire_schema = wire_schema
self._data_type = data_type
self.wireSchema = wire_schema # added compat with RSB
#print('Made a ConverterBase with wire '+str(self._wire_schema)+' and data '+str(self._data_type))
def serialize(self, value):
raise NotImplementedError('NOT IMPLEMENTED for ' \
+ self.__class__.__name__+': serialize')
def deserialize(self, stream, _UNUSED_override_wire_schema):
raise NotImplementedError('NOT IMPLEMENTED for ' \
+ self.__class__.__name__+': deserialize')
class IntConverter(ConverterBase):
"""Convert Python int objects to Protobuf ints and vice versa."""
def __init__(self, wireSchema="int", dataType=None):
super(IntConverter, self).__init__(bytearray, int, wireSchema)
def serialize(self, value):
pbo = ipaaca.ipaaca_pb2.IntMessage()
pbo.value = value
return pbo.SerializeToString(), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IntMessage()
pbo.ParseFromString(byte_stream)
return pbo.value
def pack_payload_entry(entry, key, value, _type=None):
#if _type is None: _type=ipaaca.iu.IUPayloadType.JSON
entry.key = key
if _type is None or _type == ipaaca.iu.IUPayloadType.JSON:
entry.value = json.dumps(value)
elif _type == ipaaca.iu.IUPayloadType.STR or _type == 'MAP':
entry.value = str(value)
else:
raise ipaaca.exception.IpaacaException('Asked to send payload entry with unsupported type "' + _type + '".')
entry.type = _type
def unpack_payload_entry(entry):
# We assume that the only transfer types are 'STR' or 'JSON'. Both are transparently handled by json.loads
if entry.type == ipaaca.iu.IUPayloadType.JSON:
return json.loads(entry.value)
elif entry.type == ipaaca.iu.IUPayloadType.STR or entry.type == 'str':
return entry.value
else:
LOGGER.warn('Received payload entry with unsupported type "' + entry.type + '".')
return entry.value
class IUConverter(rsb.converter.Converter):
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-iu", dataType=ipaaca.iu.IU):
super(IUConverter, self).__init__(bytearray, dataType, wireSchema)
self._access_mode = ipaaca_pb2.IU.PUSH
self._remote_data_type = ipaaca.iu.RemotePushIU
def serialize(self, iu):
pbo = ipaaca_pb2.IU()
pbo.access_mode = self._access_mode
pbo.uid = iu._uid
pbo.revision = iu._revision
pbo.category = iu._category
pbo.payload_type = iu._payload_type
pbo.owner_name = iu._owner_name
pbo.committed = iu._committed
pbo.read_only = iu._read_only
for k, v in iu._payload.iteritems():
entry = pbo.payload.add()
pack_payload_entry(entry, k, v, iu.payload_type)
for type_ in iu._links.keys():
linkset = pbo.links.add()
linkset.type = type_
linkset.targets.extend(iu._links[type_])
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca_pb2.IU()
pbo.ParseFromString(str(byte_stream))
_payload = {}
for entry in pbo.payload:
_payload[entry.key] = unpack_payload_entry(entry)
_links = collections.defaultdict(set)
for linkset in pbo.links:
for target_uid in linkset.targets:
_links[linkset.type].add(target_uid)
return self._remote_data_type(
uid=pbo.uid,
revision=pbo.revision,
read_only = pbo.read_only,
owner_name = pbo.owner_name,
category = pbo.category,
payload_type = 'str' if pbo.payload_type is 'MAP' else pbo.payload_type,
committed = pbo.committed,
payload=_payload,
links=_links)
# We assume that the only transfer types are 'STR' or 'JSON'. Both are transparently handled by json.loads
if entry.type == ipaaca.iu.IUPayloadType.JSON:
return json.loads(entry.value)
elif entry.type == ipaaca.iu.IUPayloadType.STR or entry.type == 'str':
return entry.value
else:
LOGGER.warn('Received payload entry with unsupported type "' + entry.type + '".')
return entry.value
class IUConverter(ConverterBase):
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-iu", dataType=None): #ipaaca.iu.IU):
super(IUConverter, self).__init__(bytearray, ipaaca.iu.IU if dataType is None else dataType, wireSchema)
self._access_mode = ipaaca.ipaaca_pb2.IU.PUSH
self._remote_data_type = ipaaca.iu.RemotePushIU
def serialize(self, iu):
pbo = ipaaca.ipaaca_pb2.IU()
pbo.access_mode = self._access_mode
pbo.uid = iu._uid
pbo.revision = iu._revision
pbo.category = iu._category
pbo.payload_type = iu._payload_type
pbo.owner_name = iu._owner_name
pbo.committed = iu._committed
pbo.read_only = iu._read_only
for k, v in iu._payload.items():
entry = pbo.payload.add()
pack_payload_entry(entry, k, v, iu.payload_type)
for type_ in iu._links.keys():
linkset = pbo.links.add()
linkset.type = type_
linkset.targets.extend(iu._links[type_])
return pbo.SerializeToString(), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IU()
pbo.ParseFromString(byte_stream)
_payload = {}
for entry in pbo.payload:
_payload[entry.key] = unpack_payload_entry(entry)
_links = collections.defaultdict(set)
for linkset in pbo.links:
for target_uid in linkset.targets:
_links[linkset.type].add(target_uid)
return self._remote_data_type(
uid=pbo.uid,
revision=pbo.revision,
read_only = pbo.read_only,
owner_name = pbo.owner_name,
category = pbo.category,
payload_type = 'str' if pbo.payload_type == 'MAP' else pbo.payload_type,
committed = pbo.committed,
payload=_payload,
links=_links)
class MessageConverter(IUConverter):
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-messageiu", dataType=ipaaca.iu.Message):
super(MessageConverter, self).__init__(wireSchema, dataType)
self._access_mode = ipaaca_pb2.IU.MESSAGE
self._remote_data_type = ipaaca.iu.RemoteMessage
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-messageiu", dataType=None): #ipaaca.iu.Message):
super(MessageConverter, self).__init__(wireSchema, ipaaca.iu.Message)
self._access_mode = ipaaca.ipaaca_pb2.IU.MESSAGE
self._remote_data_type = ipaaca.iu.RemoteMessage
class IULinkUpdate(object):
def __init__(self, uid, revision, is_delta, writer_name="undef", new_links=None, links_to_remove=None):
super(IULinkUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.writer_name = writer_name
self.is_delta = is_delta
self.new_links = collections.defaultdict(set) if new_links is None else collections.defaultdict(set, new_links)
self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove)
def __str__(self):
s = 'LinkUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_links = '+str(self.new_links)+', '
s += 'links_to_remove = '+str(self.links_to_remove)+')'
return s
class IULinkUpdateConverter(rsb.converter.Converter):
def __init__(self, wireSchema="ipaaca-iu-link-update", dataType=IULinkUpdate):
super(IULinkUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu_link_update):
pbo = ipaaca_pb2.IULinkUpdate()
pbo.uid = iu_link_update.uid
pbo.writer_name = iu_link_update.writer_name
pbo.revision = iu_link_update.revision
for type_ in iu_link_update.new_links.keys():
linkset = pbo.new_links.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.new_links[type_])
for type_ in iu_link_update.links_to_remove.keys():
linkset = pbo.links_to_remove.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.links_to_remove[type_])
pbo.is_delta = iu_link_update.is_delta
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
type = self.getDataType()
if type == IULinkUpdate:
pbo = ipaaca_pb2.IULinkUpdate()
pbo.ParseFromString( str(byte_stream) )
LOGGER.debug('received an IULinkUpdate for revision '+str(pbo.revision))
iu_link_up = IULinkUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_links:
iu_link_up.new_links[str(entry.type)] = set(entry.targets)
for entry in pbo.links_to_remove:
iu_link_up.links_to_remove[str(entry.type)] = set(entry.targets)
return iu_link_up
else:
raise ValueError("Inacceptable dataType %s" % type)
def __init__(self, uid, revision, is_delta, writer_name="undef", new_links=None, links_to_remove=None, request_uid=None, request_endpoint=None):
super(IULinkUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.writer_name = writer_name
self.is_delta = is_delta
self.new_links = collections.defaultdict(set) if new_links is None else collections.defaultdict(set, new_links)
self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove)
self.request_uid = request_uid
self.request_endpoint = request_endpoint
def __str__(self):
s = 'LinkUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_links = '+str(self.new_links)+', '
s += 'links_to_remove = '+str(self.links_to_remove)+')'
return s
class IULinkUpdateConverter(ConverterBase):
def __init__(self, wireSchema="ipaaca-iu-link-update", dataType=None): #=IULinkUpdate):
super(IULinkUpdateConverter, self).__init__(bytearray, IULinkUpdate, wireSchema)
def serialize(self, iu_link_update):
pbo = ipaaca.ipaaca_pb2.IULinkUpdate()
pbo.uid = iu_link_update.uid
pbo.writer_name = iu_link_update.writer_name
pbo.revision = iu_link_update.revision
if iu_link_update.request_uid:
pbo.request_uid = iu_link_update.request_uid
if iu_link_update.request_endpoint:
pbo.request_endpoint = iu_link_update.request_endpoint
for type_ in iu_link_update.new_links.keys():
linkset = pbo.new_links.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.new_links[type_])
for type_ in iu_link_update.links_to_remove.keys():
linkset = pbo.links_to_remove.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.links_to_remove[type_])
pbo.is_delta = iu_link_update.is_delta
return pbo.SerializeToString(), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IULinkUpdate()
pbo.ParseFromString(byte_stream)
LOGGER.debug('received an IULinkUpdate for revision '+str(pbo.revision))
iu_link_up = IULinkUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta, request_uid=pbo.request_uid, request_endpoint=pbo.request_endpoint)
for entry in pbo.new_links:
iu_link_up.new_links[str(entry.type)] = set(entry.targets)
for entry in pbo.links_to_remove:
iu_link_up.links_to_remove[str(entry.type)] = set(entry.targets)
return iu_link_up
class IUPayloadUpdate(object):
def __init__(self, uid, revision, is_delta, payload_type, writer_name="undef", new_items=None, keys_to_remove=None):
super(IUPayloadUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.payload_type = payload_type
self.writer_name = writer_name
self.is_delta = is_delta
self.new_items = {} if new_items is None else new_items
self.keys_to_remove = [] if keys_to_remove is None else keys_to_remove
def __str__(self):
s = 'PayloadUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'payload_type='+str(self.payload_type)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_items = '+str(self.new_items)+', '
s += 'keys_to_remove = '+str(self.keys_to_remove)+')'
return s
class IUPayloadUpdateConverter(rsb.converter.Converter):
def __init__(self, wireSchema="ipaaca-iu-payload-update", dataType=IUPayloadUpdate):
super(IUPayloadUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu_payload_update):
pbo = ipaaca_pb2.IUPayloadUpdate()
pbo.uid = iu_payload_update.uid
pbo.writer_name = iu_payload_update.writer_name
pbo.revision = iu_payload_update.revision
for k, v in iu_payload_update.new_items.items():
entry = pbo.new_items.add()
pack_payload_entry(entry, k, v, iu_payload_update.payload_type)
pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove)
pbo.is_delta = iu_payload_update.is_delta
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
type = self.getDataType()
if type == IUPayloadUpdate:
pbo = ipaaca_pb2.IUPayloadUpdate()
pbo.ParseFromString( str(byte_stream) )
LOGGER.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, payload_type=None, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_items:
iu_up.new_items[entry.key] = unpack_payload_entry(entry)
iu_up.keys_to_remove = pbo.keys_to_remove[:]
return iu_up
else:
raise ValueError("Inacceptable dataType %s" % type)
def __init__(self, uid, revision, is_delta, payload_type, writer_name="undef", new_items=None, keys_to_remove=None, request_uid=None, request_endpoint=None):
super(IUPayloadUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.payload_type = payload_type
self.writer_name = writer_name
self.is_delta = is_delta
self.new_items = {} if new_items is None else new_items
self.keys_to_remove = [] if keys_to_remove is None else keys_to_remove
self.request_uid = request_uid
self.request_endpoint = request_endpoint
def __str__(self):
s = 'PayloadUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'payload_type='+str(self.payload_type)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_items = '+str(self.new_items)+', '
s += 'keys_to_remove = '+str(self.keys_to_remove)+')'
return s
class IUPayloadUpdateConverter(ConverterBase):
def __init__(self, wireSchema="ipaaca-iu-payload-update", dataType=None):
super(IUPayloadUpdateConverter, self).__init__(bytearray, IUPayloadUpdate, wireSchema)
def serialize(self, iu_payload_update):
pbo = ipaaca.ipaaca_pb2.IUPayloadUpdate()
pbo.uid = iu_payload_update.uid
pbo.writer_name = iu_payload_update.writer_name
pbo.revision = iu_payload_update.revision
if iu_payload_update.request_uid:
pbo.request_uid = iu_payload_update.request_uid
if iu_payload_update.request_endpoint:
pbo.request_endpoint = iu_payload_update.request_endpoint
for k, v in iu_payload_update.new_items.items():
entry = pbo.new_items.add()
pack_payload_entry(entry, k, v, iu_payload_update.payload_type)
pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove)
pbo.is_delta = iu_payload_update.is_delta
return pbo.SerializeToString(), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IUPayloadUpdate()
pbo.ParseFromString(byte_stream)
LOGGER.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, payload_type=None, writer_name=pbo.writer_name, is_delta=pbo.is_delta, request_uid=pbo.request_uid, request_endpoint=pbo.request_endpoint)
for entry in pbo.new_items:
iu_up.new_items[entry.key] = unpack_payload_entry(entry)
iu_up.keys_to_remove = pbo.keys_to_remove[:]
return iu_up
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -33,6 +33,15 @@
IPAACA_DEFAULT_CHANNEL = 'default'
IPAACA_LOGGER_NAME = 'ipaaca'
IPAACA_DEFAULT_LOGGING_LEVEL = 'WARNING'
IPAACA_DEFAULT_IU_PAYLOAD_TYPE = 'JSON' # one of ipaaca.iu.IUPayloadType
IPAACA_DEFAULT_RSB_HOST = None
IPAACA_DEFAULT_RSB_PORT = None
IPAACA_DEFAULT_RSB_TRANSPORT = None
IPAACA_DEFAULT_RSB_SOCKET_SERVER = None
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -48,6 +48,35 @@ __all__ = [
class IpaacaError(Exception): pass
class BackendInitializationError(IpaacaError):
"""Error indicating that type marshalling cannot proceed
because no matching converter is known."""
def __init__(self, name=''):
super(BackendInitializationError, self).__init__( \
'Failed to initialize selected backend '+str(name))
class BackendSerializationError(IpaacaError):
"""Error indicating that type marshalling cannot proceed
because no matching converter is known."""
def __init__(self, typ):
super(BackendSerializationError, self).__init__( \
'Could not serialize type ' + str(typ.__name__) \
+ ' - no converter registered.')
class BackendDeserializationError(IpaacaError):
"""Error indicating that type unmarshalling cannot proceed
because no matching converter is known."""
def __init__(self, wire_schema):
super(BackendDeserializationError, self).__init__( \
'Could not deserialize wire format "' + str(wire_schema) \
+ '" - no converter registered.')
class ConverterRegistrationError(IpaacaError):
'''Error indicating that a type or wire schema already had a registered converter.'''
def __init__(self, type_name_or_schema):
super(ConverterRegistrationError, self).__init__(
'Failed to register a converter: we already have one for ' \
+ str(type_name_or_schema))
class IUCommittedError(IpaacaError):
"""Error indicating that an IU is immutable because it has been committed to."""
......@@ -82,13 +111,26 @@ class IUPublishedError(IpaacaError):
class IUReadOnlyError(IpaacaError):
"""Error indicating that an IU is immutable because it is 'read only'."""
def __init__(self, iu):
super(IUReadOnlyError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it is read-only.')
super(IUReadOnlyError, self).__init__(
'Writing to IU ' + str(iu.uid) + ' failed -- it is read-only.')
class IUResendRequestFailedError(IpaacaError):
"""Error indicating that a remote IU resend failed."""
def __init__(self, iu_uid):
super(IUResendRequestFailedError, self).__init__(
'Remote resend failed for IU ' + str(iu_uid))
class IUResendRequestRemoteServerUnknownError(IpaacaError):
"""Error indicating that a remote IU resend failed."""
def __init__(self, iu_uid):
super(IUResendRequestRemoteServerUnknownError, self).__init__(
'Remote resend request: remote server unknown for IU ' + str(iu_uid))
class IURetractedError(IpaacaError):
"""Error indicating that an IU has been retracted."""
def __init__(self, iu):
super(IUResendFailedError, self).__init__('Remote resend failed for IU ' + str(iu.uid) + '.')
super(IURetractedError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it has been retracted.')
class IUUpdateFailedError(IpaacaError):
......
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -37,6 +37,9 @@ import copy
import threading
import uuid
import six
import ipaaca.ipaaca_pb2
import ipaaca.converter
import ipaaca.exception
import ipaaca.misc
......@@ -103,18 +106,18 @@ class IUInterface(object):
self._links = collections.defaultdict(set)
def __str__(self):
s = unicode(self.__class__)+"{ "
s = str(self.__class__)+"{ "
s += "category="+("<None>" if self._category is None else self._category)+" "
s += "uid="+self._uid+" "
s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") "
s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " "
s += "payload={ "
for k,v in self.payload.items():
s += k+":'"+unicode(v)+"', "
s += k+":'"+str(v)+"', "
s += "} "
s += "links={ "
for t,ids in self.get_all_links().items():
s += t+":'"+unicode(ids)+"', "
for t, ids in self.get_all_links().items():
s += t+":'"+str(ids)+"', "
s += "} "
s += "}"
return s
......@@ -134,13 +137,13 @@ class IUInterface(object):
def add_links(self, type, targets, writer_name=None):
'''Attempt to add links if the conditions are met
and send an update message. Then call the local setter.'''
if not hasattr(targets, '__iter__'): targets=[targets]
if isinstance(targets, six.string_types) and hasattr(targets, '__iter__'): targets=[targets]
self._modify_links(is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name)
self._add_and_remove_links( add={type:targets}, remove={} )
def remove_links(self, type, targets, writer_name=None):
'''Attempt to remove links if the conditions are met
and send an update message. Then call the local setter.'''
if not hasattr(targets, '__iter__'): targets=[targets]
if isinstance(targets, six.string_types) and hasattr(targets, '__iter__'): targets=[targets]
self._modify_links(is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name)
self._add_and_remove_links( add={}, remove={type:targets} )
def modify_links(self, add, remove, writer_name=None):
......@@ -172,9 +175,9 @@ class IUInterface(object):
if self._buffer is None:
self._payload_type = type
else:
raise IpaacaException('The IU is already in a buffer, cannot change payload type anymore.')
raise ipaaca.exception.IpaacaError('The IU is already in a buffer, cannot change payload type anymore.')
payload_type = property(
fget=_get_payload_type,
fget=_get_payload_type,
fset=_set_payload_type,
doc='Type of the IU payload')
......@@ -208,7 +211,7 @@ class IUInterface(object):
return self._buffer
def _set_buffer(self, buffer):
if self._buffer is not None:
raise IpaacaException('The IU is already in a buffer, cannot move it.')
raise ipaaca.exception.IpaacaError('The IU is already in a buffer, cannot move it.')
self._buffer = buffer
buffer = property(
fget=_get_buffer,
......@@ -219,7 +222,7 @@ class IUInterface(object):
return self._owner_name
def _set_owner_name(self, owner_name):
if self._owner_name is not None:
raise Exception('The IU already has an owner name, cannot change it.')
raise ipaaca.exception.IpaacaError('The IU already has an owner name, cannot change it.')
self._owner_name = owner_name
owner_name = property(
fget=_get_owner_name,
......@@ -240,7 +243,9 @@ class IU(IUInterface):
self._payload = ipaaca.payload.Payload(iu=self)
def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
if self.committed:
if self._retracted:
raise ipaaca.exception.IURetractedError(self)
if self._committed:
raise ipaaca.exception.IUCommittedError(self)
with self.revision_lock:
# modify links locally
......@@ -257,7 +262,9 @@ class IU(IUInterface):
def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
"""Modify the payload: add or remove items from this payload locally and send update."""
if self.committed:
if self._retracted:
raise ipaaca.exception.IURetractedError(self)
if self._committed:
raise ipaaca.exception.IUCommittedError(self)
with self.revision_lock:
# set item locally
......@@ -278,23 +285,33 @@ class IU(IUInterface):
self._revision += 1
def _internal_commit(self, writer_name=None):
if self.committed:
raise ipaaca.exception.IUCommittedError(self)
if self._committed:
return
if self._retracted:
raise ipaaca.exception.IURetractedError(self)
with self.revision_lock:
if not self._committed:
self._increase_revision_number()
self._committed = True
if self.buffer is not None:
self.buffer._send_iu_commission(self, writer_name=writer_name)
self._increase_revision_number()
self._committed = True
if self.buffer is not None:
self.buffer._send_iu_commission(self, writer_name=writer_name)
def commit(self):
"""Commit to this IU."""
return self._internal_commit()
def retract(self):
if self._buffer:
self._buffer.remove(self)
self._buffer = None
else:
self._retracted = True
def _get_payload(self):
return self._payload
def _set_payload(self, new_pl, writer_name=None):
if self.committed:
if self._retracted:
raise ipaaca.exception.IURetractedError(self)
if self._committed:
raise ipaaca.exception.IUCommittedError(self)
with self.revision_lock:
self._increase_revision_number()
......@@ -315,7 +332,7 @@ class IU(IUInterface):
def _set_buffer(self, buffer):
if self._buffer is not None:
raise Exception('The IU is already in a buffer, cannot move it.')
raise ipaaca.exception.IpaacaError('The IU is already in a buffer, cannot move it.')
self._buffer = buffer
self.owner_name = buffer.unique_name
self._payload.owner_name = buffer.unique_name
......@@ -363,7 +380,9 @@ class Message(IU):
if self.is_published:
LOGGER.info('Info: modifying a Message after sending has no global effects')
else:
if self.committed:
if self._retracted:
raise ipaaca.exception.IURetractedError(self)
if self._committed:
raise ipaaca.exception.IUCommittedError(self)
with self.revision_lock:
self._increase_revision_number()
......@@ -384,7 +403,7 @@ class Message(IU):
def _set_buffer(self, buffer):
if self._buffer is not None:
raise Exception('The IU is already in a buffer, cannot move it.')
raise ipaaca.exception.IpaacaError('The IU is already in a buffer, cannot move it.')
self._buffer = buffer
self.owner_name = buffer.unique_name
self._payload.owner_name = buffer.unique_name
......@@ -413,7 +432,6 @@ class RemoteMessage(IUInterface):
self._category = category
self.owner_name = owner_name
self._committed = committed
self._retracted = False
# NOTE Since the payload is an already-existant Payload which we didn't modify ourselves,
# don't try to invoke any modification checks or network updates ourselves either.
# We are just receiving it here and applying the new data.
......@@ -480,7 +498,6 @@ class RemotePushIU(IUInterface):
self._category = category
self.owner_name = owner_name
self._committed = committed
self._retracted = False
# NOTE Since the payload is an already-existant Payload which we didn't modify ourselves,
# don't try to invoke any modification checks or network updates ourselves either.
# We are just receiving it here and applying the new data.
......@@ -489,9 +506,11 @@ class RemotePushIU(IUInterface):
def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
"""Modify the links: add or remove item from this payload remotely and send update."""
if self.committed:
if self._retracted:
raise ipaaca.exception.IURetractedError(self)
if self._committed:
raise ipaaca.exception.IUCommittedError(self)
if self.read_only:
if self._read_only:
raise ipaaca.exception.IUReadOnlyError(self)
requested_update = ipaaca.converter.IULinkUpdate(
uid=self.uid,
......@@ -509,9 +528,11 @@ class RemotePushIU(IUInterface):
def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
"""Modify the payload: add or remove item from this payload remotely and send update."""
if self.committed:
if self._retracted:
raise ipaaca.exception.IURetractedError(self)
if self._committed:
raise ipaaca.exception.IUCommittedError(self)
if self.read_only:
if self._read_only:
raise ipaaca.exception.IUReadOnlyError(self)
requested_update = ipaaca.converter.IUPayloadUpdate(
uid=self.uid,
......@@ -530,30 +551,32 @@ class RemotePushIU(IUInterface):
def commit(self):
"""Commit to this IU."""
if self.read_only:
raise ipaaca.exception.IUReadOnlyError(self)
if self._committed:
# ignore commit requests when already committed
return
if self._retracted:
raise ipaaca.exception.IURetractedError(self)
if self._read_only:
raise ipaaca.exception.IUReadOnlyError(self)
commission_request = ipaaca.ipaaca_pb2.IUCommission()
commission_request.uid = self.uid
commission_request.revision = self.revision
commission_request.writer_name = self.buffer.unique_name
remote_server = self.buffer._get_remote_server(self)
new_revision = remote_server.commit(commission_request)
if new_revision == 0:
raise ipaaca.exception.IUUpdateFailedError(self)
else:
commission_request = ipaaca_pb2.IUCommission()
commission_request.uid = self.uid
commission_request.revision = self.revision
commission_request.writer_name = self.buffer.unique_name
remote_server = self.buffer._get_remote_server(self)
new_revision = remote_server.commit(commission_request)
if new_revision == 0:
raise ipaaca.exception.IUUpdateFailedError(self)
else:
self._revision = new_revision
self._committed = True
self._revision = new_revision
self._committed = True
def _get_payload(self):
return self._payload
def _set_payload(self, new_pl):
if self.committed:
if self._retracted:
raise ipaaca.exception.IURetractedError(self)
if self._committed:
raise ipaaca.exception.IUCommittedError(self)
if self.read_only:
if self._read_only:
raise ipaaca.exception.IUReadOnlyError(self)
requested_update = ipaaca.converter.IUPayloadUpdate(
uid=self.uid,
......
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -35,10 +35,10 @@ from __future__ import division, print_function
import argparse
import logging
import sys
import ipaaca.defaults
__all__ = [
'enum',
'IpaacaArgumentParser',
......@@ -54,6 +54,7 @@ def enum(*sequential, **named):
"""
enums = dict(zip(sequential, range(len(sequential))), **named)
enums['_choices'] = enums.keys()
enums['_values'] = enums.values() # RY e.g. see if raw int is valid
return type('Enum', (object,), enums)
......@@ -69,6 +70,21 @@ class IpaacaLoggingHandler(logging.Handler):
msg = str(record.msg.format(record.args))
print(meta + msg)
class RSBLoggingHandler(logging.Handler):
'''A logging handler that prints to stdout, RSB version.'''
def __init__(self, prefix='IPAACA', level=logging.NOTSET):
logging.Handler.__init__(self, level)
self._prefix = prefix
def emit(self, record):
meta = '[%s: %s] ' % (self._prefix, str(record.levelname))
try:
msg = str(record.msg % record.args)
except:
msg = str(record.msg) + ' WITH ARGS: ' + str(record.args)
print(meta + msg)
class GenericNoLoggingHandler(logging.Handler):
'''A logging handler that produces no output'''
......@@ -80,21 +96,21 @@ def get_library_logger():
return logging.getLogger(ipaaca.defaults.IPAACA_LOGGER_NAME)
__IPAACA_LOGGING_HANDLER = IpaacaLoggingHandler('IPAACA')
__GENERIC_NO_LOG_HANDLER = GenericNoLoggingHandler()
_IPAACA_LOGGING_HANDLER = IpaacaLoggingHandler('IPAACA')
_GENERIC_NO_LOG_HANDLER = GenericNoLoggingHandler()
# By default, suppress library logging
# - for IPAACA
get_library_logger().addHandler(__GENERIC_NO_LOG_HANDLER)
get_library_logger().addHandler(_GENERIC_NO_LOG_HANDLER)
# - for RSB
logging.getLogger('rsb').addHandler(__GENERIC_NO_LOG_HANDLER)
logging.getLogger('rsb').addHandler(_GENERIC_NO_LOG_HANDLER)
def enable_logging(level=None):
'''Enable ipaaca's 'library-wide logging.'''
ipaaca_logger = get_library_logger()
ipaaca_logger.addHandler(__IPAACA_LOGGING_HANDLER)
ipaaca_logger.removeHandler(__GENERIC_NO_LOG_HANDLER)
ipaaca_logger.addHandler(_IPAACA_LOGGING_HANDLER)
ipaaca_logger.removeHandler(_GENERIC_NO_LOG_HANDLER)
ipaaca_logger.setLevel(level=level if level is not None else
ipaaca.defaults.IPAACA_DEFAULT_LOGGING_LEVEL)
......@@ -120,10 +136,30 @@ class IpaacaArgumentParser(argparse.ArgumentParser):
def __call__(self, parser, namespace, values, option_string=None):
rsb_logger = logging.getLogger('rsb')
rsb_logger.addHandler(IpaacaLoggingHandler('RSB'))
rsb_logger.removeHandler(__GENERIC_NO_LOG_HANDLER)
rsb_logger.addHandler(RSBLoggingHandler('RSB'))
rsb_logger.removeHandler(_GENERIC_NO_LOG_HANDLER)
rsb_logger.setLevel(level=values)
class IpaacaRSBHost(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST = values
class IpaacaRSBPort(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT = values
class IpaacaRSBTransport(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT = values
class IpaacaRSBSocketServer(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
ipaaca.defaults.IPAACA_DEFAULT_RSB_SOCKET_SERVER = values
def __init__(self, prog=None, usage=None, description=None, epilog=None,
parents=[], formatter_class=argparse.HelpFormatter,
prefix_chars='-', fromfile_prefix_chars=None,
......@@ -136,6 +172,7 @@ class IpaacaArgumentParser(argparse.ArgumentParser):
conflict_handler=conflict_handler, add_help=add_help)
def _add_ipaaca_lib_arguments(self):
# CMD-arguments for ipaaca
ipaacalib_group = self.add_argument_group('IPAACA library arguments')
ipaacalib_group.add_argument(
'--ipaaca-payload-type',
......@@ -157,6 +194,7 @@ class IpaacaArgumentParser(argparse.ArgumentParser):
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'],
dest='_ipaaca_logging_level_',
help="enable IPAACA logging with threshold")
# CMD-arguments for rsb
rsblib_group = self.add_argument_group('RSB library arguments')
rsblib_group.add_argument(
'--rsb-enable-logging',
......@@ -164,6 +202,36 @@ class IpaacaArgumentParser(argparse.ArgumentParser):
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'],
dest='_ipaaca_rsb_enable_logging_',
help="enable RSB logging with threshold")
rsblib_group.add_argument(
'--rsb-host',
action=self.IpaacaRSBHost,
default=None,
dest='_ipaaca_rsb_set_host_',
metavar='HOST',
help="set RSB host")
rsblib_group.add_argument(
'--rsb-port',
action=self.IpaacaRSBPort,
default=None,
dest='_ipaaca_rsb_set_port_',
metavar='PORT',
help="set RSB port")
rsblib_group.add_argument(
'--rsb-transport',
action=self.IpaacaRSBTransport,
default=None,
dest='_ipaaca_rsb_set_transport_',
choices=['spread', 'socket'],
metavar='TRANSPORT',
help="set RSB transport")
rsblib_group.add_argument(
'--rsb-socket-server',
action=self.IpaacaRSBSocketServer,
default=None,
dest='_ipaaca_rsb_set_socket_server_',
choices=['0', '1', 'auto'],
metavar='server',
help="act as server (only when --rsb-transport=socket)")
def parse_args(self, args=None, namespace=None):
self._add_ipaaca_lib_arguments() # Add ipaaca-args just before parsing
......
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -30,7 +30,7 @@
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import threading
import time
......@@ -53,12 +53,12 @@ class Payload(dict):
def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False, update_timeout=_DEFAULT_PAYLOAD_UPDATE_TIMEOUT):
self.iu = iu
_pl = {}
for k, v in ({} if new_payload is None else new_payload).iteritems():
_pl[unicode(k, 'utf8') if type(k) == str else k] = unicode(v, 'utf8') if type(v) == str else v
for k, v in ({} if new_payload is None else new_payload).items():
_pl[str(k) if type(k) == str else k] = str(v) if type(v) == str else v
# NOTE omit_init_update_message is necessary to prevent checking for
# exceptions and sending updates in the case where we just receive
# a whole new payload from the remote side and overwrite it locally.
for k, v in _pl.iteritems():
for k, v in _pl.items():
dict.__setitem__(self, k, v)
if (not omit_init_update_message) and (self.iu.buffer is not None):
self.iu._modify_payload(
......@@ -85,8 +85,8 @@ class Payload(dict):
def __setitem__(self, k, v, writer_name=None):
with self._batch_update_lock:
k = unicode(k, 'utf8') if type(k) == str else k
v = unicode(v, 'utf8') if type(v) == str else v
k = str(k) if type(k) == str else k
v = str(v) if type(v) == str else v
if self._update_on_every_change:
self.iu._modify_payload(
is_delta=True,
......@@ -96,11 +96,13 @@ class Payload(dict):
else: # Collect additions/modifications
self._batch_update_writer_name = writer_name
self._collected_modifications[k] = v
# revoke deletion of item since a new version has been added
self._collected_removals = [i for i in self._collected_removals if i!=k]
return dict.__setitem__(self, k, v)
def __delitem__(self, k, writer_name=None):
with self._batch_update_lock:
k = unicode(k, 'utf8') if type(k) == str else k
k = str(k) if type(k) == str else k
if self._update_on_every_change:
self.iu._modify_payload(
is_delta=True,
......@@ -110,6 +112,8 @@ class Payload(dict):
else: # Collect additions/modifications
self._batch_update_writer_name = writer_name
self._collected_removals.append(k)
# revoke older update of item since more recent changes take precedence
if k in self._collected_modifications: del self._collected_modifications[k]
return dict.__delitem__(self, k)
# Context-manager based batch updates, not thread-safe (on remote updates)
......@@ -132,9 +136,9 @@ class Payload(dict):
def merge(self, payload, writer_name=None):
with self._batch_update_lock:
for k, v in payload.iteritems():
k = unicode(k, 'utf8') if type(k) == str else k
v = unicode(v, 'utf8') if type(v) == str else v
for k, v in payload.items():
k = str(k) if type(k) == str else k
v = str(v) if type(v) == str else v
self.iu._modify_payload(
is_delta=True,
new_items=payload,
......@@ -144,11 +148,11 @@ class Payload(dict):
def _remotely_enforced_setitem(self, k, v):
"""Sets an item when requested remotely."""
return dict.__setitem__(self, k, v)
dict.__setitem__(self, k, v)
def _remotely_enforced_delitem(self, k):
"""Deletes an item when requested remotely."""
return dict.__delitem__(self, k)
if k in self: dict.__delitem__(self, k)
def _wait_batch_update_lock(self, timeout):
# wait lock with time-out http://stackoverflow.com/a/8393033
......@@ -216,18 +220,22 @@ class PayloadItemDictProxy(PayloadItemProxy, dict):
value = self.content.get(key, default)
return self._create_proxy(value, key)
def items(self):
return [(key, value) for key, value in self.iteritems()]
# py3port: were these used at all?
# def items(self):
# return [(key, value) for key, value in self.items()]
def iteritems(self):
for key, value in self.content.iteritems():
# py3port: was iteritems
def items(self):
for key, value in self.content.items():
yield key, self._create_proxy(value, key)
def values(self):
return [value for value in self.itervalues()]
# py3port: were these used at all?
# def values(self):
# return [value for value in self.values()]
def itervalues(self):
for key, value in self.content.iteritems():
# py3port: was itervalues
def values(self):
for key, value in self.content.items():
yield self._create_proxy(value, key)
def pop(self, key, *args):
......
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -32,4 +32,4 @@
from __future__ import division, print_function
from notifier import ComponentNotifier
from .notifier import ComponentNotifier
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import datetime
import subprocess
import sys
import threading
import time
import traceback
import uuid
import ipaaca
import ipaaca.misc
import six
__all__ = [
'logger_send_ipaaca_logs',
'logger_set_log_filename',
'logger_set_module_name',
'logger_set_log_level',
'LOG_DEBUG',
'LOG_INFO',
'LOG_WARN',
'LOG_WARNING',
'LOG_ERROR',
]
LogLevel = ipaaca.misc.enum(
DEBUG = 0,
INFO = 1,
WARN = 2,
ERROR = 3,
SILENT = 4,
)
LOG_LEVEL_FROM_STRING_DICT = {
'DEBUG': LogLevel.DEBUG,
'INFO': LogLevel.INFO,
'WARN': LogLevel.WARN,
'WARNING': LogLevel.WARN,
'ERROR': LogLevel.ERROR,
'NONE': LogLevel.SILENT,
'SILENT': LogLevel.SILENT,
}
CURRENT_LOG_LEVEL = LogLevel.DEBUG
LOGGER_LOCK = threading.RLock()
MODULE_NAME = sys.argv[0]
SEND_IPAACA_LOGS = True
OUTPUT_BUFFER = None
STANDARD_LOG_FILE_EXTENSION = '.log'
LOG_MODES = ['append', 'timestamp', 'overwrite']
def logger_set_log_filename(filename, existing=None):
global OUTPUT_BUFFER
if existing is not None and not existing in LOG_MODES:
raise Exception('Invalid log mode {mode} given. '
'Valid options are {options}.'.format(
mode=existing,
options=', '.join(LOG_MODES)))
with LOGGER_LOCK:
if OUTPUT_BUFFER is None:
OUTPUT_BUFFER = ipaaca.OutputBuffer('LogSender')
msg = ipaaca.Message('logcontrol')
msg.payload = {
'cmd': 'open_log_file',
'filename': filename}
if existing is not None:
msg.payload['existing'] = existing
OUTPUT_BUFFER.add(msg)
def logger_set_module_name(name):
global MODULE_NAME
with LOGGER_LOCK:
MODULE_NAME = name
def logger_send_ipaaca_logs(flag=True):
global SEND_IPAACA_LOGS
with LOGGER_LOCK:
SEND_IPAACA_LOGS = flag
def logger_set_log_level(level=LogLevel.DEBUG):
global CURRENT_LOG_LEVEL
with LOGGER_LOCK:
if level in LogLevel._values:
CURRENT_LOG_LEVEL = level
elif isinstance(level, six.string_types) and level.upper() in LOG_LEVEL_FROM_STRING_DICT:
CURRENT_LOG_LEVEL = LOG_LEVEL_FROM_STRING_DICT[level.upper()]
else:
pass # leave previous setting untouched
def LOG_IPAACA(lvl, text, now=0.0, fn='???', thread='???'):
global OUTPUT_BUFFER
uid = str(uuid.uuid4())[0:8]
with LOGGER_LOCK:
if OUTPUT_BUFFER is None:
OUTPUT_BUFFER = ipaaca.OutputBuffer('LogSender')
msg = ipaaca.Message('log')
msg.payload = {
'module': MODULE_NAME,
'function': fn,
'level': lvl,
'time':' %.3f'%now,
'thread': thread,
'uuid': uid,
'text': text,}
try:
OUTPUT_BUFFER.add(msg)
except Exception as e:
LOG_ERROR('Caught an exception while logging via ipaaca. '
+ ' str(e); '
+ traceback.format_exc())
def LOG_CONSOLE(lvlstr, msg, fn_markup='', msg_markup='', now=0.0, fn='???', thread='???'):
if isinstance(msg, six.string_types):
lines = msg.split('\n')
else:
lines = [msg]
for line in lines:
text = lvlstr+' '+thread+' '+fn_markup+fn+'\033[m'+' '+msg_markup+str(line)+'\033[m'
print(text)
fn = ' '*len(fn)
def LOG_ERROR(msg, now=None):
if CURRENT_LOG_LEVEL > LogLevel.ERROR: return
now = time.time() if now is None else now
f = sys._getframe(1)
classprefix = (f.f_locals['self'].__class__.__name__+'.') if 'self' in f.f_locals else ''
fn = classprefix + f.f_code.co_name
thread = threading.current_thread().getName()
with LOGGER_LOCK:
if SEND_IPAACA_LOGS: LOG_IPAACA('ERROR', msg, now=now, fn=fn, thread=thread)
LOG_CONSOLE('\033[38;5;9;1;4m[ERROR]\033[m', msg, fn_markup='\033[38;5;203m', msg_markup='\033[38;5;9;1;4m', now=now, fn=fn, thread=thread)
def LOG_WARN(msg, now=None):
if CURRENT_LOG_LEVEL > LogLevel.WARN: return
now = time.time() if now is None else now
f = sys._getframe(1)
classprefix = (f.f_locals['self'].__class__.__name__+'.') if 'self' in f.f_locals else ''
fn = classprefix + f.f_code.co_name
thread = threading.current_thread().getName()
with LOGGER_LOCK:
if SEND_IPAACA_LOGS: LOG_IPAACA('WARN', msg, now=now, fn=fn, thread=thread)
LOG_CONSOLE('\033[38;5;208;1m[WARN]\033[m ', msg, fn_markup='\033[38;5;214m', msg_markup='\033[38;5;208;1m', now=now, fn=fn, thread=thread)
LOG_WARNING = LOG_WARN
def LOG_INFO(msg, now=None):
if CURRENT_LOG_LEVEL > LogLevel.INFO: return
now = time.time() if now is None else now
f = sys._getframe(1)
classprefix = (f.f_locals['self'].__class__.__name__+'.') if 'self' in f.f_locals else ''
fn = classprefix + f.f_code.co_name
thread = threading.current_thread().getName()
with LOGGER_LOCK:
if SEND_IPAACA_LOGS: LOG_IPAACA('INFO', msg, now=now, fn=fn, thread=thread)
LOG_CONSOLE('[INFO] ', msg, now=now, fn=fn, thread=thread)
def LOG_DEBUG(msg, now=None):
if CURRENT_LOG_LEVEL > LogLevel.DEBUG: return
now = time.time() if now is None else now
f = sys._getframe(1)
classprefix = (f.f_locals['self'].__class__.__name__+'.') if 'self' in f.f_locals else ''
fn = classprefix + f.f_code.co_name
thread = threading.current_thread().getName()
with LOGGER_LOCK:
if SEND_IPAACA_LOGS: LOG_IPAACA('DEBUG', msg, now=now, fn=fn, thread=thread)
LOG_CONSOLE('\033[2m[DEBUG]\033[m', msg, fn_markup='\033[38;5;144m', msg_markup='\033[38;5;248m', now=now, fn=fn, thread=thread)
class LoggerComponent(object):
def __init__(self, filename, log_mode='append'):
self.ib = ipaaca.InputBuffer('Logger', ['log', 'logcontrol'])
self.ib.register_handler(self._logger_handle_iu_event)
self.logfile = None
self.log_mode = log_mode
self.open_logfile(filename)
if self.logfile is None:
print('Logging to console only ...')
print('Press Ctrl-C at any time to terminate the logger.')
def open_logfile(self, filename):
with LOGGER_LOCK:
if filename is None or filename.strip() == '':
print('No log file name specified, not opening one.')
self.close_logfile()
else:
new_logfile = None
try:
# create dir first
ret = subprocess.call(
'mkdir -p `dirname ' + filename + '`',
shell=True)
# proceed with dir+file
if self.log_mode == 'timestamp':
t = datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S')
if filename.endswith(STANDARD_LOG_FILE_EXTENSION):
# insert timestamp between filename and extension
# (only for standard extension)
filename = filename.replace(
STANDARD_LOG_FILE_EXTENSION,
'.' + t + STANDARD_LOG_FILE_EXTENSION)
else: # prepend timestamp
filename = t + '_' + filename
append_if_exist = not (self.log_mode == 'overwrite' or
self.log_mode == 'timestamp')
new_logfile = open(filename, 'a' if append_if_exist else 'w')
if self.logfile is not None:
text = u'Will now continue logging in log file ' + str(filename)
uid = str(uuid.uuid4())[0:8]
tim = time.time()
record = {
'uuid': uid,
'time': tim,
'level': u'INFO',
'text': text,
'module': u'logger',
'function': u'LoggerComponent.open_logfile',
'thread': '-',
'logreceivedtime': tim}
self.logfile.write(str(record)+'\n')
self.logfile.close()
self.logfile = new_logfile
print('Logging to console and {filename} ...'.format(filename=filename))
except Exception as e:
print('Failed to open logfile {filename} for writing! Keeping previous configuration'.format(filename=filename))
def close_logfile(self):
if self.logfile is not None:
text = u'Closing of log file requested.'
uid = str(uuid.uuid4())[0:8]
tim = str(time.time())
record = {
'uuid': uid,
'time': tim,
'level': u'INFO',
'text': text,
'module': u'logger',
'function': u'LoggerComponent.close_logfile',
'thread': u'-',
'logreceivedtime': tim}
self.logfile.write(str(record)+'\n')
self.logfile.close()
print('Closed current log file.')
self.logfile = None
def _logger_handle_iu_event(self, iu, event_type, own):
received_time = "%.3f" % time.time()
with LOGGER_LOCK:
try:
if iu.category == 'log':
pl = iu.payload
message = pl['text'] if 'text' in pl else '(No message.)'
uid = '????????' if 'uuid' not in pl else pl['uuid']
tim = '???' if 'time' not in pl else pl['time']
module = '???' if 'module' not in pl else pl['module']
function = '???' if 'function' not in pl else pl['function']
thread = '???' if 'thread' not in pl else pl['thread']
level = 'INFO' if 'level' not in pl else pl['level']
# dump to console
if level=='WARN':
level='WARNING'
if level not in ['DEBUG','INFO','WARNING','ERROR']:
level = 'INFO'
try:
print('%s %-8s {%s} {%s} {%s} %s'%(tim, ('['+level+']'), thread, module, function, message))
except:
print('Failed to format a string for printing!')
if self.logfile is not None:
try:
record = {
'uuid': uid,
'time': tim,
'level': level,
'text': message,
'module': module,
'function': function,
'thread': thread,
'logreceivedtime': received_time}
self.logfile.write(str(record) + '\n')
except:
print('Failed to write to logfile!')
elif iu.category == 'logcontrol':
cmd = iu.payload['cmd'] if 'cmd' in iu.payload else 'undef'
if cmd == 'open_log_file':
filename = iu.payload['filename'] if 'filename' in iu.payload else ''
if 'existing' in iu.payload:
log_mode_ = iu.payload['existing'].lower()
if log_mode_ not in LOG_MODES:
LOG_WARN(u'Value of "existing" should be "append", "timestamp", or "overwrite", continuing with mode {mode}'.format(mode=self.log_mode))
else:
self.log_mode = log_mode_
self.open_logfile(filename)
elif cmd == 'close_log_file':
self.close_logfile()
else:
LOG_WARN(u'Received unknown logcontrol command: '+str(cmd))
except Exception as e:
print('Exception while logging!') # TODO write to file as well?
print(u' '+str(traceback.format_exc()))
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -33,9 +33,9 @@
from __future__ import division, print_function
import os
import threading
import ipaaca.buffer
import ipaaca.iu
import ipaaca.misc
......@@ -64,11 +64,16 @@ class ComponentError(Exception):
class ComponentNotifier(object):
NOTIFY_CATEGORY = "componentNotify"
CONTROL_CATEGORY = "componentControl"
SEND_CATEGORIES = "send_categories"
RECEIVE_CATEGORIES = "recv_categories"
CMD = "cmd"
STATE = "state"
NAME = "name"
WHO = "who" # list of names (or empty)
FUNCTION = "function"
PID = "pid"
CMD_REPORT = "report"
def __init__(self, component_name, component_function, send_categories, receive_categories, out_buffer=None, in_buffer=None):
self.component_name = component_name
......@@ -116,14 +121,23 @@ class ComponentNotifier(object):
self.out_buffer.add(notify_iu)
def _handle_iu_event(self, iu, event_type, local):
if iu.payload[ComponentNotifier.NAME] == self.component_name:
return
with self.notification_handler_lock:
for h in self.notification_handlers:
h(iu, event_type, local)
if iu.payload[ComponentNotifier.STATE] == "new":
#print("submitting")
self._submit_notify(False)
if iu.category == ComponentNotifier.NOTIFY_CATEGORY:
if iu.payload[ComponentNotifier.NAME] == self.component_name:
return
with self.notification_handler_lock:
for h in self.notification_handlers:
h(iu, event_type, local)
if iu.payload[ComponentNotifier.STATE] == "new":
#print("submitting")
self._submit_notify(False)
elif iu.category == ComponentNotifier.CONTROL_CATEGORY:
cmd = iu.payload[ComponentNotifier.CMD]
if cmd=='report':
# Request to report (by component controller)
who = iu.payload[ComponentNotifier.WHO]
# If we are named specifically or it's a broadcast
if len(who)==0 or self.component_name in who:
self._submit_notify(False)
def add_notification_handler(self, handler):
with self.notification_handler_lock:
......@@ -155,12 +169,14 @@ class ComponentNotifier(object):
if (not self.initialized):
self.timesync_slave = ipaaca.util.timesync.TimesyncSlave(component_name=self.component_name, timing_handler=self.launch_timesync_slave_handlers)
self.timesync_master = ipaaca.util.timesync.TimesyncMaster(component_name=self.component_name, timing_handler=self.launch_timesync_master_handlers)
self.in_buffer.register_handler(self._handle_iu_event, ipaaca.iu.IUEventType.MESSAGE, ComponentNotifier.NOTIFY_CATEGORY)
self.in_buffer.register_handler(self._handle_iu_event, ipaaca.iu.IUEventType.MESSAGE, [ComponentNotifier.NOTIFY_CATEGORY, ComponentNotifier.CONTROL_CATEGORY])
self._submit_notify(True)
self.initialized = True
def __enter__(self):
self.initialize()
return self
def __exit__(self, t, v, tb):
self.terminate()
return self
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -32,6 +32,7 @@
from __future__ import division, print_function
import threading
import time
import ipaaca.buffer
......@@ -45,7 +46,7 @@ class TimesyncMaster(object):
# component name to report (None => use buffer name)
self.component_name = component_name if component_name is not None else self.ob.unique_name
#
self.ob.register_handler(self.handle_timesync_master)
#self.ob.register_handler(self.handle_timesync_master)
self.ib.register_handler(self.handle_timesync_master)
# master_t1 is identical for all slaves
self.master_t1 = None
......@@ -63,7 +64,7 @@ class TimesyncMaster(object):
self.timing_handler = timing_handler
def send_master_timesync(self):
iu = ipaaca.iu.IU('timesyncRequest')
iu = ipaaca.iu.Message('timesyncRequest')
self.master_t1 = self.get_time()
iu.payload = {
'stage':'0',
......@@ -71,47 +72,48 @@ class TimesyncMaster(object):
'master':self.component_name,
}
self.ob.add(iu)
print('Sent a stage 0 timesync as master '+self.component_name)
def handle_timesync_master(self, iu, event_type, own):
master = iu.payload['master']
if not own and master == self.component_name:
if self.component_name == master:
# reply to our own initial IU
slave = iu.payload['slave']
stage = iu.payload['stage']
if stage=='1':
print('Received stage 1 from slave '+slave)
# initial reply by slave
t1 = iu.payload['slave_t1']
self.slave_t1s[slave] = float(t1)
t2 = self.master_t2s[slave] = self.get_time()
iu.payload.merge({'master_t2': str(t2), 'stage':'2'})
latency1 = t2 - self.master_t1
self.latencies[slave] = latency1
#print('Latency of round-trip 1: %.3f' % latency1)
elif stage=='3':
print('Received stage 3 from slave '+slave)
# second reply by slave
t2 = iu.payload['slave_t2']
self.slave_t2s[slave] = float(t2)
t_final = self.get_time()
latency1 = self.latencies[slave]
latency2 = t_final - self.master_t2s[slave]
latency = self.latencies[slave] = (latency1+latency2)/2.0
offset1 = (self.slave_t1s[slave]-self.master_t1)-latency/2.0
offset2 = (self.slave_t2s[slave]-self.master_t2s[slave])-latency/2.0
offset = (offset1+offset2)/2.0
iu.payload.merge({'stage':'4', 'latency': str(latency), 'offset':str(offset)})
if self.timing_handler is None:
print('Determined timing of timesync slave '+slave)
print(' Avg round-trip latency: %.3f s'%latency)
print(' Offset of their clock: %.3f s'%offset)
if event_type == ipaaca.IUEventType.ADDED or event_type == ipaaca.IUEventType.UPDATED:
if self.component_name == master:
# reply to our own initial IU
slave = iu.payload['slave']
stage = iu.payload['stage']
if stage=='1':
# initial reply by slave
t1 = iu.payload['slave_t1']
self.slave_t1s[slave] = float(t1)
t2 = self.master_t2s[slave] = self.get_time()
iu.payload.merge({'master_t2': str(t2), 'stage':'2'})
latency1 = t2 - self.master_t1
#print('Before stage 1 for '+slave+': '+str(self.latencies))
self.latencies[slave] = latency1
#print('After stage 1 for '+slave+': '+str(self.latencies))
#print('Latency of round-trip 1: %.3f' % latency1)
elif stage=='3':
#print('At stage 3 for '+slave+': '+str(self.latencies))
# second reply by slave
t2 = iu.payload['slave_t2']
self.slave_t2s[slave] = float(t2)
t_final = self.get_time()
latency1 = self.latencies[slave]
latency2 = t_final - self.master_t2s[slave]
latency = self.latencies[slave] = (latency1+latency2)/2.0
offset1 = (self.slave_t1s[slave]-self.master_t1)-latency/2.0
offset2 = (self.slave_t2s[slave]-self.master_t2s[slave])-latency/2.0
offset = (offset1+offset2)/2.0
iu.payload.merge({'stage':'4', 'latency': str(latency), 'offset':str(offset)})
if self.timing_handler is None:
print('Determined timing of timesync slave '+slave)
print(' Avg round-trip latency: %.3f s'%latency)
print(' Offset of their clock: %.3f s'%offset)
else:
self.timing_handler(self.component_name, slave, latency, offset)
else:
self.timing_handler(self.component_name, slave, latency, offset)
else:
# other stages are handled by time slave handler
pass
# other stages are handled by time slave handler
pass
def get_time(self):
return time.time() + self.debug_offset
......@@ -129,7 +131,6 @@ class TimesyncSlave(object):
#self.master_t2 = None
#self.master = None
self.latency = None
self.my_iu = None
#
self.debug_offset = debug_offset
#
......@@ -142,30 +143,23 @@ class TimesyncSlave(object):
master = iu.payload['master']
stage = iu.payload['stage']
if self.component_name != master:
if not own:
if not own and stage=='0':
# reply only to IUs from others
if stage=='0':
#print('Received stage 0 from master '+master)
# initial reply to master
self.my_iu = ipaaca.iu.IU('timesyncReply')
# TODO: add grounded_in link too?
t1 = self.get_time()
self.my_iu.payload = iu.payload
self.my_iu.payload['slave'] = self.component_name
self.my_iu.payload['slave_t1'] = str(t1)
self.my_iu.payload['stage'] = '1'
#self.my_iu.payload.merge({
# 'slave':self.component_name,
# 'slave_t1':str(t1),
# 'stage':'1',
# })
self.ob.add(self.my_iu)
else:
#print('Received stage 0 from master '+master)
# initial reply to master
myiu = ipaaca.iu.IU('timesyncReply')
# TODO: add grounded_in link too?
t1 = self.get_time()
myiu.payload = iu.payload
myiu.payload['slave'] = self.component_name
myiu.payload['slave_t1'] = str(t1)
myiu.payload['stage'] = '1'
self.ob.add(myiu)
elif iu.payload['slave'] == self.component_name:
if stage=='2':
#print('Received stage 2 from master '+master)
t2 = self.get_time()
self.my_iu.payload.merge({
iu.payload.merge({
'slave_t2':str(t2),
'stage':'3',
})
......
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -32,7 +32,6 @@
import time
import logging
import ipaaca
iu_to_write = None
......@@ -69,8 +68,8 @@ while True:
else:
iu.payload = {'a': 'reset'}
except ipaaca.IUUpdateFailedError, e:
ipaaca.logger.warning("Payload update failed (IU changed in the mean time)")
except ipaaca.IUUpdateFailedError as e:
print("Payload update failed (IU changed in the mean time)")
time.sleep(0.1)
exit(0)
......@@ -2,5 +2,5 @@
import ipaaca
print "{this is the IpaacaPython run.py doing nothing at all}"
print("{this is the IpaacaPython run.py doing nothing at all}")
......@@ -2,7 +2,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......
......@@ -2,7 +2,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......
......@@ -2,7 +2,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......
......@@ -2,7 +2,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......
......@@ -2,7 +2,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2015 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import sys
import ipaaca
import traceback
import io
def pretty_printed_time(t):
if t<0:
t = -t
sign = '-'
else:
sign = ' '
s = float(t)
h = int(s/3600)
m = int(s/60) - h*60
s -= h*3600 + m*60
ms = int((s-int(s))*1000)
return sign+'%01d:%02d:%02d.%03d'%(h, m, int(s), ms)
REPLACEMENT_COLORS={
'30': 'black',
'31': 'red',
'32': 'green',
'33': 'cyan',
'34': 'blue',
'35': 'brown',
'36': 'magenta',
'37': 'grey',
'248': 'lightgray',
# should add more colors
}
def replace_ansi_html(s):
r = u''
in_esc = False
last_color = u''
#skip_one_double_wide_bar = False
for c in s:
if in_esc:
if c=='m':
in_esc = False
itms = last_color.replace('[','').replace(']','').split(';')
col = None
bold = False
if itms[0]=='':
r += '</code></font><code>'
else:
for i in itms:
if i=='1':
bold = True
elif i in REPLACEMENT_COLORS:
col = REPLACEMENT_COLORS[i]
if col is None:
if bold:
col = 'grey'
else:
col = 'black'
print('Warning: unknown colors '+str(itms))
r += '</code><font color="'+col+'"><code>'
else:
last_color += c
else:
#if c in u'▁▂▃▄▅▆▇█':
# if skip_one_double_wide_bar:
# skip_one_double_wide_bar = False
# else:
# r += c
# skip_one_double_wide_bar = True
#el
if c=='':
in_esc = True
last_color = ''
else:
r += c
return r
def print_header(formt, fields):
s = u''
if formt=='html':
s += '<html><head><meta charset="UTF-8"><title>flexdiam log view</title></head>\n<body>\n<table width="100%" border="1" bordercolor="lightgray" style="white-space:pre">\n<tr>'
for f in fields:
s+='<th>'+f+'</th>'
s += '</tr>'
return s
def print_footer(formt):
s = u''
if formt=='html':
s += '</tr>\n</body>\n</html>'
return s
def print_record(data, delimiter, formt):
if formt=='html':
s = u'<tr>'
for d in data:
d2 = d.replace('<', '&lt;').replace('>', '&gt;').replace('\n', '<br/>').replace('"', '&quot;')
d3 = replace_ansi_html(d2)
#s += u'<td><code>' + d2.replace('<', '&lt;').replace('>', '&gt;').replace('\n', '<br/>') + u'</code></td>'
s += u'<td><code>' + d3 + u'</code></td>'
s += u'</tr>'
return s
else:
return delimiter.join(data)
def modify(key, value, strip=False, pretty_printed_times=False, time_offset=0.0):
if key=='time':
f = float(value.strip()) - time_offset
return pretty_printed_time(f) if pretty_printed_times else str(f)
else:
return value.strip() if strip else value
if __name__=='__main__':
try:
iap = ipaaca.IpaacaArgumentParser('ipaaca-logcat')
iap.add_argument(
'-s', '--strip-fields',
dest='strip', action='store_true',
default=False,
help='Strip leading/trailing whitespace from all fields')
iap.add_argument(
'-p', '--pretty-printed-times',
dest='pretty_printed_times', action='store_true',
default=False,
help='Print human-readable times (hh:mm:ss.ms) instead of float seconds')
iap.add_argument(
'--format',
dest='format', nargs=1,
default=['plain'],
help='output format (plain, html) (default plain)')
iap.add_argument(
'-o', '--output-file',
dest='output_file', nargs=1,
default=['-'],
help='output file name (default \'-\' -> standard terminal output)')
iap.add_argument(
'-d', '--delimiter',
dest='delimiter', nargs=1,
default=['\t'],
help='field delimiter, interpreted as python unicode string (default \'\\t\')')
iap.add_argument(
'-t', '--align-times',
dest='align_times', nargs=2,
default=['0', '0'],
help='Calculate relative output timestamps (default: 0 0 => keep)\nFirst argument is a reference event timestamp from the log file\nSecond argument is the new output time for that same event')
iap.add_argument(
'-f', '--fields',
dest='fields', default=['time', 'text'], nargs='+',
help='fields to print (defaults: \'time\' \'text\')')
arguments = iap.parse_args()
delimiter = eval("u'"+arguments.delimiter[0]+"'", {"__builtins__":None}, {} )
ref_t, out_t = float(arguments.align_times[0]), float(arguments.align_times[1])
time_offset = ref_t - out_t
#print(arguments); sys.exit(1)
#modify = (lambda s: s.strip()) if arguments.strip else (lambda s: s)
#modify = lambda s: type(s).__name__
fil = sys.stdout
if arguments.output_file[0] != '-':
fil = io.open(arguments.output_file[0], 'w', encoding='utf8')
fil.write(print_header(arguments.format[0], arguments.fields)+'\n')
for line in sys.stdin:
record = eval(line.strip(), {"__builtins__":None}, {} )
data = [modify(f, unicode(record[f]), arguments.strip, arguments.pretty_printed_times, time_offset) for f in arguments.fields]
u = print_record(data, delimiter, arguments.format[0])
res = u'{}'.format(u) #.decode('utf-8')
fil.write(u''+res+'\n' )
fil.write(print_footer(arguments.format[0])+'\n')
if fil != sys.stdout: fil.close()
except (KeyboardInterrupt, SystemExit):
pass # ret below
except Exception, e:
print(u'Exception: '+unicode(traceback.format_exc()))
ipaaca.exit(1)
ipaaca.exit(0)