Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • scs/ipaaca
  • ramin.yaghoubzadeh/ipaaca
2 results
Show changes
Showing
with 2027 additions and 414 deletions
......@@ -2,7 +2,7 @@
// "Incremental Processing Architecture
// for Artificial Conversational Agents".
//
// Copyright (c) 2009-2014 Social Cognitive Systems Group
// Copyright (c) 2009-2022 Social Cognitive Systems Group
// CITEC, Bielefeld University
//
// http://opensource.cit-ec.de/projects/ipaaca/
......@@ -28,72 +28,150 @@
// Forschungsgemeinschaft (DFG) in the context of the German
// Excellence Initiative.
syntax = "proto2";
package ipaaca.protobuf;
enum TransportMessageType {
WireTypeRESERVED = 0;
WireTypeIntMessage = 1;
WireTypeRemoteRequestResult = 2;
WireTypeIU = 3;
WireTypeMessageIU = 4; // special case on the wire (use other converter)
WireTypeIUPayloadUpdate = 5;
WireTypeIULinkUpdate = 6;
WireTypeIURetraction = 7;
WireTypeIUCommission = 8;
WireTypeIUResendRequest = 9;
WireTypeIUPayloadUpdateRequest = 100;
WireTypeIUCommissionRequest = 101;
WireTypeIULinkUpdateRequest = 102;
}
message TransportLevelWrapper {
required TransportMessageType transport_message_type = 1;
required bytes raw_message = 2;
}
message IntMessage {
required sint32 value = 1;
required sint32 value = 1;
}
message LinkSet {
required string type = 1;
repeated string targets = 2;
required string type = 1;
repeated string targets = 2;
}
message PayloadItem {
required string key = 1;
required string value = 2;
required string type = 3 [default = "str"];
required string key = 1;
required string value = 2;
required string type = 3 [default = "str"];
}
message IU {
enum AccessMode {
PUSH = 0;
REMOTE = 1;
MESSAGE = 2;
}
required string uid = 1;
required uint32 revision = 2;
required string category = 3 [default = "undef"];
required string payload_type = 4 [default = "MAP"];
required string owner_name = 5;
required bool committed = 6 [default = false];
required AccessMode access_mode = 7 [default = PUSH];
required bool read_only = 8 [default = false];
repeated PayloadItem payload = 9;
repeated LinkSet links = 10;
enum AccessMode {
PUSH = 0;
REMOTE = 1;
MESSAGE = 2;
}
required string uid = 1;
required uint32 revision = 2;
required string category = 3 [default = "undef"];
required string payload_type = 4 [default = "MAP"];
required string owner_name = 5;
required bool committed = 6 [default = false];
required AccessMode access_mode = 7 [default = PUSH];
required bool read_only = 8 [default = false];
repeated PayloadItem payload = 9;
repeated LinkSet links = 10;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUPayloadUpdate {
required string uid = 1;
required uint32 revision = 2;
repeated PayloadItem new_items = 3;
repeated string keys_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
required string uid = 1;
required uint32 revision = 2;
repeated PayloadItem new_items = 3;
repeated string keys_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IURetraction {
required string uid = 1;
required uint32 revision = 2;
required string uid = 1;
required uint32 revision = 2;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUCommission {
required string uid = 1;
required uint32 revision = 2;
required string writer_name = 3;
required string uid = 1;
required uint32 revision = 2;
required string writer_name = 3;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IULinkUpdate {
required string uid = 1;
required uint32 revision = 2;
repeated LinkSet new_links = 3;
repeated LinkSet links_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUResendRequest {
required string uid = 1;
required string hidden_scope_name = 2;
required string uid = 1;
required string hidden_scope_name = 2;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IULinkUpdate {
required string uid = 1;
required uint32 revision = 2;
repeated LinkSet new_links = 3;
repeated LinkSet links_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
// Result for remote operations (below).
// Used to send a raw int, which was problematic.
// Usually: 0 = Failed, >0 = new revision of successfully modified resource.
message RemoteRequestResult {
required uint32 result = 1;
optional string request_uid = 100 [default = ""];
//optional string request_endpoint = 101 [default = ""];
}
// Remote / request versions of buffer setters:
// they just go with a dedicated ID
message IUPayloadUpdateRequest {
required string uid = 1;
required uint32 revision = 2;
repeated PayloadItem new_items = 3;
repeated string keys_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUCommissionRequest {
required string uid = 1;
required uint32 revision = 2;
required string writer_name = 3;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IULinkUpdateRequest {
required string uid = 1;
required uint32 revision = 2;
repeated LinkSet new_links = 3;
repeated LinkSet links_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
......@@ -7,7 +7,7 @@
<exec executable="protoc">
<arg value="--proto_path=../proto" />
<arg value="../proto/ipaaca.proto" />
<arg value="--python_out=build/" />
<arg value="--python_out=src/ipaaca/" />
</exec>
</target>
</project>
......
......@@ -5,8 +5,6 @@
</publications>
<dependencies>
<dependency org="google" name="protobuf-python" rev="latest.release"/>
<dependency org="rsb" name="rsb-python" rev="latest.release"/>
<dependency org="spread" name="spread" rev="latest.release"/>
</dependencies>
</ivy-module>
......
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Fri Sep 9 14:12:05 2016
@author: jpoeppel
"""
from setuptools import setup
import os
import sys
import subprocess
from os import path as op
from distutils.spawn import find_executable
from setuptools.command.build_py import build_py
from setuptools.command.bdist_egg import bdist_egg
from distutils.command.build import build
from distutils.command.sdist import sdist
class ProtoBuild(build_py):
"""
This command automatically compiles all .proto files with `protoc` compiler
and places generated files near them -- i.e. in the same directory.
"""
def find_protoc(self):
"Locates protoc executable"
if 'PROTOC' in os.environ and os.path.exists(os.environ['PROTOC']):
protoc = os.environ['PROTOC']
else:
protoc = find_executable('protoc')
if protoc is None:
sys.stderr.write('protoc not found. Is protobuf-compiler installed? \n'
'Alternatively, you can point the PROTOC environment variable at a local version.')
sys.exit(1)
return protoc
def run(self):
#TODO determine path automaticall
packagedir = "../proto"
print("running build proto")
for protofile in filter(lambda x: x.endswith('.proto'), os.listdir(packagedir)):
source = op.join(packagedir, protofile)
output = source.replace('.proto', '_pb2.py')
if (not op.exists(output) or (op.getmtime(source) > op.getmtime(output))):
sys.stderr.write('Protobuf-compiling ' + source + '\n')
subprocess.check_call([self.find_protoc(), "-I={}".format(packagedir),'--python_out=./src/ipaaca', source])
class BDist_egg(bdist_egg):
'''
Simple wrapper around the normal bdist_egg command to require
protobuf build before normal build.
.. codeauthor:: jwienke
'''
def run(self):
self.run_command('build_proto')
bdist_egg.run(self)
class Build(build):
'''
Simple wrapper around the normal build command to require protobuf build
before normal build.
.. codeauthor:: jwienke
'''
def run(self):
self.run_command('build_proto')
build.run(self)
class Sdist(sdist):
'''
Simple wrapper around the normal sdist command to require protobuf build
before generating the source distribution..
.. codeauthor:: jwienke
'''
def run(self):
# fetch the protocol before building the source distribution so that
# we have a cached version and each user can rebuild the protocol
# with his own protobuf version
self.run_command('build_proto')
sdist.run(self)
version = "0.1.3" #TODO determine correct version! ideally from git, maybe do something similar to rsb/setup.py
setup(name="ipaaca",
version=version,
author="Hendrik Buschmeier, Ramin Yaghoubzadeh, Sören Klett",
author_email="hbuschme@uni-bielefeld.de,ryaghoubzadeh@uni-bielefeld.de,sklett@techfak.uni-bielefeld.de",
license='LGPLv3+',
url='https://opensource.cit-ec.de/projects/ipaaca',
install_requires=["paho-mqtt", "six", "protobuf"],
packages=["ipaaca", "ipaaca.util"],
package_dir={"ipaaca":"src/ipaaca"},
# TODO Do we want to add ipaaca_pb2.py to the egg or as separate package?
# data_files=[("./ipaaca", ["ipaaca_pb2.py"])],
# dependency_links=[
# 'http://www.spread.org/files/'
# 'SpreadModule-1.5spread4.tgz#egg=SpreadModule-1.5spread4'],
cmdclass ={
"build_proto": ProtoBuild,
"sdist": Sdist,
"build": Build,
"bdist_egg":BDist_egg
}
)
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -36,7 +36,7 @@ import ipaaca
def remote_change_dumper(iu, event_type, local):
if local:
print 'remote side '+event_type+': '+str(iu)
print('remote side '+event_type+': '+str(iu))
ob = ipaaca.OutputBuffer('CoolInformerOut')
......
*_pb2.py
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -32,12 +32,13 @@
from __future__ import division, print_function
import os
import threading
import rsb
import rsb.converter
#import rsb
#import rsb.converter
import ipaaca_pb2
import ipaaca.ipaaca_pb2
import ipaaca.converter
from ipaaca.buffer import InputBuffer, OutputBuffer
from ipaaca.exception import *
......@@ -45,55 +46,77 @@ from ipaaca.iu import IU, Message, IUAccessMode, IUEventType
from ipaaca.misc import enable_logging, IpaacaArgumentParser
from ipaaca.payload import Payload
import ipaaca.backend
#
# ipaaca.exit(int_retval)
#
from ipaaca.buffer import atexit_cleanup_function
def exit(int_retval=0):
'''For the time being, this function can be used to
circumvent any sys.exit blocks, while at the same time
cleaning up the buffers (e.g. retracting IUs).
Call once at the end of any python script (or anywhere
in lieu of sys.exit() / os._exit(). '''
print('ipaaca: cleaning up and exiting with code '+str(int_retval))
atexit_cleanup_function()
os._exit(int_retval)
__RSB_INITIALIZER_LOCK = threading.Lock()
__RSB_INITIALIZED = False
def initialize_ipaaca_rsb_if_needed():
'''Register own RSB Converters and initialise RSB from default config file.'''
global __RSB_INITIALIZED
with __RSB_INITIALIZER_LOCK:
if __RSB_INITIALIZED:
return
else:
rsb.converter.registerGlobalConverter(
ipaaca.converter.IntConverter(
wireSchema="int32",
dataType=int))
rsb.converter.registerGlobalConverter(
ipaaca.converter.IUConverter(
wireSchema="ipaaca-iu",
dataType=IU))
rsb.converter.registerGlobalConverter(
ipaaca.converter.MessageConverter(
wireSchema="ipaaca-messageiu",
dataType=Message))
rsb.converter.registerGlobalConverter(
ipaaca.converter.IULinkUpdateConverter(
wireSchema="ipaaca-iu-link-update",
dataType=converter.IULinkUpdate))
rsb.converter.registerGlobalConverter(
ipaaca.converter.IUPayloadUpdateConverter(
wireSchema="ipaaca-iu-payload-update",
dataType=converter.IUPayloadUpdate))
rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca_pb2.IUCommission))
rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca_pb2.IUResendRequest))
rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca_pb2.IURetraction))
rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()
"""Initialise rsb if not yet initialise.
__RSB_INITIALIZED = True
* Register own RSB converters.
* Initialise RSB from enviroment variables, rsb config file, or
from default values for RSB trnasport, host, and port (via
ipaaca.defaults or ipaaca.misc.IpaacaArgumentParser).
"""
global __RSB_INITIALIZED
with __RSB_INITIALIZER_LOCK:
if __RSB_INITIALIZED:
return
else:
ipaaca.converter.register_global_converter(
ipaaca.converter.IUConverter(
wireSchema="ipaaca-iu",
dataType=IU))
ipaaca.converter.register_global_converter(
ipaaca.converter.MessageConverter(
wireSchema="ipaaca-messageiu",
dataType=Message))
ipaaca.converter.register_global_converter(
ipaaca.converter.IULinkUpdateConverter(
wireSchema="ipaaca-iu-link-update",
dataType=converter.IULinkUpdate))
ipaaca.converter.register_global_converter(
ipaaca.converter.IUPayloadUpdateConverter(
wireSchema="ipaaca-iu-payload-update",
dataType=converter.IUPayloadUpdate))
if ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT is not None:
if ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT == 'spread':
os.environ['RSB_TRANSPORT_SPREAD_ENABLED'] = str(1)
os.environ['RSB_TRANSPORT_SOCKET_ENABLED'] = str(0)
elif ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT == 'socket':
os.environ['RSB_TRANSPORT_SPREAD_ENABLED'] = str(0)
os.environ['RSB_TRANSPORT_SOCKET_ENABLED'] = str(1)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_SOCKET_SERVER is not None:
os.environ['RSB_TRANSPORT_SOCKET_SERVER'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_SOCKET_SERVER)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST is not None:
os.environ['RSB_TRANSPORT_SPREAD_HOST'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST)
os.environ['RSB_TRANSPORT_SOCKET_HOST'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT is not None:
os.environ['RSB_TRANSPORT_SPREAD_PORT'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT)
os.environ['RSB_TRANSPORT_SOCKET_PORT'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT)
#
ipaaca.backend.register_backends()
__RSB_INITIALIZED = True
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import ipaaca.converter
import threading
import uuid
import os
import time
LOGGER = ipaaca.misc.get_library_logger()
__registered_backends = {}
__backend_registration_done = False
def register_backends():
global __registered_backends
global __backend_registration_done
if not __backend_registration_done:
__backend_registration_done = True
LOGGER.debug('Registering available back-ends')
# register available backends
# mqtt
import ipaaca.backend_mqtt
be = ipaaca.backend_mqtt.create_backend()
if be is not None:
__registered_backends[be.name] = be
LOGGER.debug('Back-end '+str(be.name)+' added')
# ros
import ipaaca.backend_ros
be = ipaaca.backend_ros.create_backend()
if be is not None:
__registered_backends[be.name] = be
LOGGER.debug('Back-end '+str(be.name)+' added')
def get_default_backend():
# TODO selection mechanism / config
if not __backend_registration_done:
register_backends()
if len(__registered_backends) == 0:
raise RuntimeError('No back-ends could be initialized for ipaaca-python')
cfg = ipaaca.config.get_global_config()
preferred = cfg.get_with_default('backend', None)
if preferred is None:
k = list(__registered_backends.keys())[0]
if len(__registered_backends) > 1:
LOGGER.warning('No preferred ipaaca.backend set, returning one of several (probably the first in list)')
print('Using randomly selected back-end {}!'.format(k))
else:
if preferred in __registered_backends:
k = preferred
else:
raise ipaaca.exception.BackendInitializationError(preferred)
LOGGER.info('Back-end is '+str(k))
return __registered_backends[k]
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import collections
import ipaaca.ipaaca_pb2
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import ipaaca.converter
import ipaaca.backend
import ipaaca.config
import threading
try:
import queue
except:
import Queue as queue
import uuid
import os
import time
try:
import paho.mqtt.client as mqtt
MQTT_ENABLED = True
except:
MQTT_ENABLED = False
if not MQTT_ENABLED:
def create_backend():
return None
else:
def create_backend():
return MQTTBackend(name='mqtt')
LOGGER = ipaaca.misc.get_library_logger()
_REMOTE_SERVER_MAX_QUEUED_REQUESTS = -1 # unlimited
_REMOTE_LISTENER_MAX_QUEUED_EVENTS = 1024 # 'Full' exception if exceeded
class EventWrapper(object):
def __init__(self, data):
self.data = data
class PendingRequest(object):
'''Encapsulation of a pending remote request with
a facility to keep the requesting thread locked
until the reply or a timeout unlocks it.'''
def __init__(self, request):
self._request = request
self._event = threading.Event()
self._reply = None
self._request_uid = str(uuid.uuid4())[0:8]
def wait_for_reply(self, timeout=30.0):
wr = self._event.wait(timeout)
return None if wr is False else self._reply
def reply_with_result(self, reply):
self._reply = reply
self._event.set()
class Informer(object):
'''Informer interface, wrapping an outbound port to MQTT'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
self._mqtt_client.connect(self._host, self._port)
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._live = True
self._live_event.set()
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
pass
def publishData(self, data):
#print('Informer publishing '+str(data.__class__.__name__)+' on '+self._scope)
self._mqtt_client.publish(self._scope, ipaaca.converter.serialize(data), qos=2)
class BackgroundEventDispatcher(threading.Thread):
def __init__(self, listener):
super(BackgroundEventDispatcher, self).__init__()
self.daemon = True
self._listener = listener
def terminate(self):
self._running = False
def run(self):
self._running = True
listener = self._listener
while self._running: # auto-terminated (daemon)
event = listener._event_queue.get(block=True, timeout=None)
if event is None: return # signaled termination
#print('\033[31mDispatch '+str(event.data.__class__.__name__)+' start ...\033[m')
for handler in self._listener._handlers:
handler(event)
#print('\033[32m... dispatch '+str(event.data.__class__.__name__)+' end.\033[m')
class Listener(object):
'''Listener interface, wrapping an inbound port from MQTT'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
self._event_queue = queue.Queue(_REMOTE_LISTENER_MAX_QUEUED_EVENTS)
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_socket_open = self.mqtt_callback_on_socket_open
#self._mqtt_client.on_socket_close = self.mqtt_callback_on_socket_close
#self._mqtt_client.on_log = self.mqtt_callback_on_log
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self._dispatcher = BackgroundEventDispatcher(self)
self._dispatcher.start()
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._event_queue.put(None, block=False) # signal termination, waking queue
self._dispatcher.terminate()
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
LOGGER.debug('Connect to '+str(self._host)+':'+str(self._port))
self._mqtt_client.connect(self._host, self._port)
#def mqtt_callback_on_log(self, client, userdata, level, buf):
# print('Listener: LOG: '+str(buf))
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._mqtt_client.subscribe(self._scope, qos=2)
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
self._live = True
self._live_event.set()
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
event = EventWrapper(ipaaca.converter.deserialize(message.payload))
self._event_queue.put(event, block=False) # queue event for BackgroundEventDispatcher
def addHandler(self, handler):
self._handlers.append(handler)
#def publishData(self, data):
# self._mqtt_client.publish(self._
class LocalServer(object):
'''LocalServer interface, allowing for RPC requests to
IU functions, or reporting back success or failure.'''
def __init__(self, buffer_impl, scope, config=None):
self._buffer = buffer_impl
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_LocalServer_' + self._uuid # unused atm
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
self._mqtt_client.connect(self._host, self._port)
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._mqtt_client.subscribe(self._scope, qos=2)
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
self._live = True
self._live_event.set()
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
req = ipaaca.converter.deserialize(message.payload)
result = None
if isinstance(req, ipaaca.converter.IUPayloadUpdate):
result = self.attempt_to_apply_remote_updatePayload(req)
elif isinstance(req, ipaaca.converter.IULinkUpdate):
result = self.attempt_to_apply_remote_updateLinks(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUCommission):
result = self.attempt_to_apply_remote_commit(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUResendRequest):
result = self.attempt_to_apply_remote_resendRequest(req)
else:
raise RuntimeError('LocalServer: got an object of wrong class '+str(req.__class__.__name__)) # TODO replace
if result is not None:
self.send_result_for_request(req, result)
#
def send_result_for_request(self, obj, result):
pbo = ipaaca.ipaaca_pb2.RemoteRequestResult()
pbo.result = result
pbo.request_uid = obj.request_uid
#print('Sending result to endpoint '+str(obj.request_endpoint))
self._mqtt_client.publish(obj.request_endpoint, ipaaca.converter.serialize(pbo), qos=2)
def attempt_to_apply_remote_updateLinks(self, obj):
return self._buffer._remote_update_links(obj)
def attempt_to_apply_remote_updatePayload(self, obj):
return self._buffer._remote_update_payload(obj)
def attempt_to_apply_remote_commit(self, obj):
return self._buffer._remote_commit(obj)
def attempt_to_apply_remote_resendRequest(self, obj):
return self._buffer._remote_request_resend(obj)
class RemoteServer(object):
'''RemoteServer, connects to a LocalServer on the side
of an actual IU owner, which will process any requests.
The RemoteServer is put on hold while the owner is
processing. RemoteServer is from RSB terminology,
it might more aptly be described as an RPC client.'''
def __init__(self, remote_end_scope, config=None):
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
#queue.Queue(_REMOTE_SERVER_MAX_QUEUED_REQUESTS)
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_RemoteServer_' + self._uuid
# will RECV here:
self._scope = '/ipaaca/remotes/' + self._name
# will SEND here
self._remote_end_scope = remote_end_scope
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + remote_end_scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
self._mqtt_client.connect(self._host, self._port)
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._mqtt_client.subscribe(self._scope, qos=2)
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
self._live = True
self._live_event.set()
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
reply = ipaaca.converter.deserialize(message.payload)
if isinstance(reply, ipaaca.ipaaca_pb2.RemoteRequestResult):
uid = reply.request_uid
pending_request = None
with self._pending_requests_lock:
if uid in self._pending_requests:
pending_request = self._pending_requests[uid]
del self._pending_requests[uid]
if pending_request is None:
raise RuntimeError('RemoteServer: got a reply for request uid that is not queued: '+str(uid))
else:
# provide result to other thread and unblock it
pending_request.reply_with_result(reply)
else:
raise RuntimeError('RemoteServer: got an object of wrong class '+str(reply.__class__.__name__)) # TODO replace
def queue_pending_request(self, request):
pending_request = PendingRequest(request)
with self._pending_requests_lock:
if _REMOTE_SERVER_MAX_QUEUED_REQUESTS>0 and len(self._pending_requests) >= _REMOTE_SERVER_MAX_QUEUED_REQUESTS:
raise RuntimeError('RemoteServer: maximum number of pending requests exceeded') # TODO replace?
else:
self._pending_requests[pending_request._request_uid] = pending_request
return pending_request
# impl
def blocking_call(self, request):
# Broker's queue will raise before sending anything if capacity is exceeded
pending_request = self.queue_pending_request(request)
# complete and send request
request.request_uid = pending_request._request_uid
request.request_endpoint = self._scope
self._mqtt_client.publish(self._remote_end_scope, ipaaca.converter.serialize(request), qos=2)
# wait for other end to return result
reply = pending_request.wait_for_reply()
if reply is None:
LOGGER.warning('A request timed out!')
return 0
else:
return reply.result # the actual int result
# glue that quacks like the RSB version
def resendRequest(self, req):
return self.blocking_call(req)
def commit(self, req):
return self.blocking_call(req)
def updatePayload(self, req):
return self.blocking_call(req)
def updateLinks(self, req):
return self.blocking_call(req)
class MQTTBackend(object):
def __init__(self, name='mqtt'):
# back-end initialization code
self._config = ipaaca.config.get_global_config()
self._name = name
self._participants = set()
def _get_name(self):
return self._name
name = property(_get_name)
def teardown(self):
LOGGER.info('MQTT teardown: waiting 1 sec for final deliveries')
time.sleep(1)
for p in self._participants:
p.deactivate_internal()
def Scope(self, scope_str):
'''Scope adapter (glue replacing rsb.Scope)'''
return str(scope_str)
def createLocalServer(self, buffer_impl, scope, config=None):
LOGGER.debug('Creating a LocalServer on '+str(scope))
s = LocalServer(buffer_impl, scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
def createRemoteServer(self, scope, config=None):
LOGGER.debug('Creating a RemoteServer on '+str(scope))
s = RemoteServer(scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
def createInformer(self, scope, config=None, dataType="ignored in this backend"):
LOGGER.debug('Creating an Informer on '+str(scope))
s = Informer(scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
def createListener(self, scope, config=None):
LOGGER.debug('Creating a Listener on '+str(scope))
s = Listener(scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import collections
import sys
import ipaaca.ipaaca_pb2
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import ipaaca.converter
import ipaaca.backend
import ipaaca.config as config
LOGGER = ipaaca.misc.get_library_logger()
ROS_ENABLED, __try_guessing = False, False
try:
import rospy
from std_msgs.msg import String
import base64
ROS_ENABLED = True
except:
LOGGER.debug('rospy or deps not found, ROS backend disabled')
ROS_ENABLED = False
if not ROS_ENABLED:
def create_backend():
return None
else:
def create_backend():
return ROSBackend(name='ros')
import threading
try:
import queue
except:
import Queue as queue
import uuid
import os
import time
import sys
class EventWrapper(object):
def __init__(self, data):
self.data = data
class PendingRequest(object):
'''Encapsulation of a pending remote request with
a facility to keep the requesting thread locked
until the reply or a timeout unlocks it.'''
def __init__(self, request):
self._request = request
self._event = threading.Event()
self._reply = None
self._request_uid = str(uuid.uuid4())[0:8]
def wait_for_reply(self, timeout=30.0):
wr = self._event.wait(timeout)
return None if wr is False else self._reply
def reply_with_result(self, reply):
self._reply = reply
self._event.set()
class Informer(object):
'''Informer interface, wrapping an outbound port to ROS'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._ros_pub = rospy.Publisher(self._scope, String, queue_size=100, tcp_nodelay=True, latch=True)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = config.get_with_default('transport.mqtt.port', 1883, warn=True)
def deactivate(self):
pass
#self._ros_pub.unregister()
#self._ros_pub = None
def publishData(self, data):
self._ros_pub.publish(ROSBackend.serialize(data))
class BackgroundEventDispatcher(threading.Thread):
def __init__(self, event, handlers):
super(BackgroundEventDispatcher, self).__init__()
self.daemon = True
self._event = event
self._handlers = handlers
def run(self):
for handler in self._handlers:
handler(self._event)
class Listener(object):
'''Listener interface, wrapping an inbound port from ROS'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._ros_sub = rospy.Subscriber(self._scope, String, self.on_message, tcp_nodelay=True)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = config.get_with_default('transport.mqtt.port', 1883, warn=True)
def deactivate(self):
pass
#self._ros_sub.unregister()
#self._ros_sub = None
def on_message(self, message):
event = EventWrapper(ROSBackend.deserialize(message.data))
## (1) with extra thread:
#dispatcher = BackgroundEventDispatcher(event, self._handlers)
#dispatcher.start()
## or (2) with no extra thread:
for handler in self._handlers:
handler(event)
def addHandler(self, handler):
self._handlers.append(handler)
class LocalServer(object):
'''LocalServer interface, allowing for RPC requests to
IU functions, or reporting back success or failure.'''
def __init__(self, buffer_impl, scope, config=None):
self._buffer = buffer_impl
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_LocalServer_' + self._uuid # unused atm
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._ros_pubs = {}
self._ros_sub = rospy.Subscriber(self._scope, String, self.on_message, tcp_nodelay=True)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = config.get_with_default('transport.mqtt.port', 1883, warn=True)
def get_publisher(self, endpoint):
if endpoint in self._ros_pubs:
return self._ros_pubs[endpoint]
else:
p = rospy.Publisher(endpoint, String, queue_size=10, tcp_nodelay=True, latch=True)
self._ros_pubs[endpoint] = p
return p
def deactivate(self):
pass
#self._ros_sub.unregister()
#for v in self._ros_pubs.values():
# v.unregister()
#self._ros_sub = None
#self._ros_pubs = {}
def on_message(self, message):
req = ROSBackend.deserialize(message.data)
result = None
if isinstance(req, ipaaca.converter.IUPayloadUpdate):
result = self.attempt_to_apply_remote_updatePayload(req)
elif isinstance(req, ipaaca.converter.IULinkUpdate):
result = self.attempt_to_apply_remote_updateLinks(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUCommission):
result = self.attempt_to_apply_remote_commit(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUResendRequest):
result = self.attempt_to_apply_remote_resendRequest(req)
else:
raise RuntimeError('LocalServer: got an object of wrong class '+str(req.__class__.__name__)) # TODO replace
if result is not None:
self.send_result_for_request(req, result)
#
def send_result_for_request(self, obj, result):
pbo = ipaaca.ipaaca_pb2.RemoteRequestResult()
pbo.result = result
pbo.request_uid = obj.request_uid
#print('Sending result to endpoint '+str(obj.request_endpoint))
pub = self.get_publisher(obj.request_endpoint)
pub.publish(ROSBackend.serialize(pbo))
def attempt_to_apply_remote_updateLinks(self, obj):
return self._buffer._remote_update_links(obj)
def attempt_to_apply_remote_updatePayload(self, obj):
return self._buffer._remote_update_payload(obj)
def attempt_to_apply_remote_commit(self, obj):
return self._buffer._remote_commit(obj)
def attempt_to_apply_remote_resendRequest(self, obj):
return self._buffer._remote_request_resend(obj)
_REMOTE_SERVER_MAX_QUEUED_REQUESTS = -1 # unlimited
class RemoteServer(object):
'''RemoteServer, connects to a LocalServer on the side
of an actual IU owner, which will process any requests.
The RemoteServer is put on hold while the owner is
processing. RemoteServer is from RSB terminology,
it might more aptly be described as an RPC client.'''
def __init__(self, remote_end_scope, config=None):
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
#queue.Queue(_REMOTE_SERVER_MAX_QUEUED_REQUESTS)
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_RemoteServer_' + self._uuid
# will RECV here:
self._scope = '/ipaaca/remotes/' + self._name
# will SEND here
self._remote_end_scope = remote_end_scope
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + remote_end_scope
self._ros_pub = rospy.Publisher(self._remote_end_scope, String, queue_size=10, tcp_nodelay=True, latch=True)
self._ros_sub = rospy.Subscriber(self._scope, String, self.on_message, tcp_nodelay=True)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = config.get_with_default('transport.mqtt.port', 1883, warn=True)
def deactivate(self):
pass
#self._ros_sub.unregister()
#self._ros_pub.unregister()
#self._ros_sub = None
#self._ros_pub = None
def on_message(self, message):
reply = ROSBackend.deserialize(message.data)
if isinstance(reply, ipaaca.ipaaca_pb2.RemoteRequestResult):
uid = reply.request_uid
pending_request = None
with self._pending_requests_lock:
if uid in self._pending_requests:
pending_request = self._pending_requests[uid]
del self._pending_requests[uid]
if pending_request is None:
raise RuntimeError('RemoteServer: got a reply for request uid that is not queued: '+str(uid))
else:
# provide result to other thread and unblock it
pending_request.reply_with_result(reply)
else:
raise RuntimeError('RemoteServer: got an object of wrong class '+str(reply.__class__.__name__)) # TODO replace
def queue_pending_request(self, request):
pending_request = PendingRequest(request)
with self._pending_requests_lock:
if _REMOTE_SERVER_MAX_QUEUED_REQUESTS>0 and len(self._pending_requests) >= _REMOTE_SERVER_MAX_QUEUED_REQUESTS:
raise RuntimeError('RemoteServer: maximum number of pending requests exceeded') # TODO replace?
else:
self._pending_requests[pending_request._request_uid] = pending_request
return pending_request
# impl
def blocking_call(self, request):
# Broker's queue will raise before sending anything if capacity is exceeded
pending_request = self.queue_pending_request(request)
# complete and send request
request.request_uid = pending_request._request_uid
request.request_endpoint = self._scope
self._ros_pub.publish(ROSBackend.serialize(request))
# wait for other end to return result
reply = pending_request.wait_for_reply()
if reply is None:
LOGGER.warning('A request timed out!')
return 0
else:
return reply.result # the actual int result
# glue that quacks like the RSB version
def resendRequest(self, req):
return self.blocking_call(req)
def commit(self, req):
return self.blocking_call(req)
def updatePayload(self, req):
return self.blocking_call(req)
def updateLinks(self, req):
return self.blocking_call(req)
class ROSBackend(object):
def __init__(self, name='ros'):
#import logging
# back-end initialization code
self._name = name
self._need_init = True
#logging.basicConfig(level=logging.DEBUG)
def init_once(self):
'''Actual back-end initialization is only done when it is used'''
if self._need_init:
self._need_init = False
self._config = config.get_global_config()
try:
# generate a ROS node prefix from the basename of argv[0]
clean_name = ''.join([c for c in sys.argv[0].rsplit('/',1)[-1].replace('.', '_').replace('-','_') if c.lower() in 'abcdefghijklmnoprqstuvwxzy0123456789_'])
except:
clean_name = ''
rospy.init_node('ipaaca_python' if len(clean_name)==0 else clean_name,
anonymous=True, disable_signals=True)
def _get_name(self):
return self._name
name = property(_get_name)
def teardown(self):
LOGGER.info('ROS teardown: waiting 1 sec for final deliveries')
time.sleep(1)
rospy.signal_shutdown('Done')
@staticmethod
def serialize(obj):
#print('object class: '+obj.__class__.__name__)
bb = ipaaca.converter.serialize(obj)
st = str(base64.b64encode(bb))
#print('serialized: '+str(st))
return st
@staticmethod
def deserialize(msg):
#print('got serialized: '+str(msg))
bb = base64.b64decode(msg)
return ipaaca.converter.deserialize(bb)
def Scope(self, scope_str):
'''Scope adapter (glue replacing rsb.Scope)'''
# ROS graph resources must not start with a slash
return str(scope_str)[1:] if scope_str.startswith('/') else str(scope_str)
def createLocalServer(self, buffer_impl, scope, config=None):
self.init_once()
LOGGER.debug('Creating a LocalServer on '+str(scope))
LOGGER.debug(' from thread '+threading.current_thread().name)
s = LocalServer(buffer_impl, scope, self._config if config is None else config)
#s._live_event.wait(30.0)
return s
def createRemoteServer(self, scope, config=None):
self.init_once()
LOGGER.debug('Creating a RemoteServer on '+str(scope))
LOGGER.debug(' from thread '+threading.current_thread().name)
s = RemoteServer(scope, self._config if config is None else config)
#s._live_event.wait(30.0)
return s
def createInformer(self, scope, config=None, dataType="ignored in this backend"):
self.init_once()
LOGGER.debug('Creating an Informer on '+str(scope))
LOGGER.debug(' from thread '+threading.current_thread().name)
s = Informer(scope, self._config if config is None else config)
#s._live_event.wait(30.0)
return s
def createListener(self, scope, config=None):
self.init_once()
LOGGER.debug('Creating a Listener on '+str(scope))
LOGGER.debug(' from thread '+threading.current_thread().name)
s = Listener(scope, self._config if config is None else config)
#s._live_event.wait(30.0)
return s
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2015 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -37,24 +37,23 @@ import threading
import uuid
import traceback
import six
import weakref
import atexit
import rsb
#import rsb
import ipaaca_pb2
import ipaaca.ipaaca_pb2
import ipaaca.defaults
import ipaaca.exception
import ipaaca.converter
import ipaaca.iu
import ipaaca.backend
__all__ = [
'InputBuffer',
'OutputBuffer',
]
LOGGER = ipaaca.misc.get_library_logger()
# set of objects to auto-clean on exit, assumes _teardown() method
......@@ -67,6 +66,7 @@ def atexit_cleanup_function():
obj = obj_r()
if obj is not None: # if weakref still valid
obj._teardown()
ipaaca.backend.get_default_backend().teardown()
atexit.register(atexit_cleanup_function)
def auto_teardown_instances(fn):
......@@ -114,10 +114,10 @@ class IUEventHandler(object):
self._handler_function = handler_function
self._for_event_types = (
None if for_event_types is None else
(for_event_types[:] if hasattr(for_event_types, '__iter__') else [for_event_types]))
(for_event_types[:] if not isinstance(for_event_types, six.string_types) and hasattr(for_event_types, '__iter__') else [for_event_types]))
self._for_categories = (
None if for_categories is None else
(for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories]))
(for_categories[:] if not isinstance(for_categories, six.string_types) and hasattr(for_categories, '__iter__') else [for_categories]))
def condition_met(self, event_type, category):
"""Check whether this IUEventHandler should be called.
......@@ -160,7 +160,7 @@ class Buffer(object):
ipaaca.initialize_ipaaca_rsb_if_needed()
self._owning_component_name = owning_component_name
self._channel = channel if channel is not None else ipaaca.defaults.IPAACA_DEFAULT_CHANNEL
self._participant_config = rsb.ParticipantConfig.fromDefaultSources() if participant_config is None else participant_config
self._participant_config = participant_config
self._uuid = str(uuid.uuid4())[0:8]
# Initialise with a temporary, but already unique, name
self._unique_name = "undef-"+self._uuid
......@@ -171,6 +171,12 @@ class Buffer(object):
return FrozenIUStore(original_iu_store = self._iu_store)
iu_store = property(fget=_get_frozen_iu_store, doc='Copy-on-read version of the internal IU store')
def _get_channel(self):
return self._channel
channel = property(
fget=_get_channel,
doc='The IPAACA channel the buffer is connected to.')
def register_handler(self, handler_function, for_event_types=None, for_categories=None):
"""Register a new IU event handler function.
......@@ -194,9 +200,9 @@ class Buffer(object):
h.call(self, uid, local=local, event_type=event_type, category=category)
except Exception as e:
if local:
LOGGER.error('Local IU handler raised an exception upon remote write.' + unicode(e))
LOGGER.error('Local IU handler raised an exception upon remote write.' + str(e))
else:
print(unicode(traceback.format_exc()))
print(str(traceback.format_exc()))
raise e
def _get_owning_component_name(self):
......@@ -214,6 +220,7 @@ class InputBuffer(Buffer):
"""An InputBuffer that holds remote IUs."""
@auto_teardown_instances
def __init__(self, owning_component_name, category_interests=None, channel=None, participant_config=None, resend_active=False):
'''Create an InputBuffer.
......@@ -235,13 +242,14 @@ class InputBuffer(Buffer):
def _get_remote_server(self, event_or_iu):
'''Return (or create, store and return) a remote server.'''
_owner = self._get_owner(event_or_iu)
if _owner:
_remote_server_name = self._get_owner(event_or_iu) + '/Server'
if _remote_server_name:
try:
return self._remote_server_store[_owner]
return self._remote_server_store[_remote_server_name]
except KeyError:
remote_server = rsb.createRemoteServer(rsb.Scope(str(_owner)))
self._remote_server_store[_owner] = remote_server
be = ipaaca.backend.get_default_backend()
remote_server = be.createRemoteServer(be.Scope(str(_remote_server_name)), config=self._participant_config)
self._remote_server_store[_remote_server_name] = remote_server
return remote_server
else:
None
......@@ -263,7 +271,8 @@ class InputBuffer(Buffer):
def _add_category_listener(self, iu_category):
'''Create and store a listener on a specific category.'''
if iu_category not in self._listener_store:
cat_listener = rsb.createListener(rsb.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)), config=self._participant_config)
be = ipaaca.backend.get_default_backend()
cat_listener = be.createListener(be.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)), config=self._participant_config)
cat_listener.addHandler(self._handle_iu_events)
self._listener_store[iu_category] = cat_listener
self._category_interests.append(iu_category)
......@@ -276,6 +285,24 @@ class InputBuffer(Buffer):
self._category_interests.remove(iu_category)
LOGGER.info("Removed listener in scope /ipaaca/channel/" + str(self._channel) + "/category/ " + iu_category)
def _teardown(self):
'''OutputBuffer retracts remaining live IUs on teardown'''
self._deactivate_all_internal()
def __del__(self):
'''Perform teardown as soon as Buffer is lost.'''
self._deactivate_all_internal()
def _deactivate_all_internal(self):
'''Deactivate all participants.'''
for listener in self._listener_store.values():
try:
listener.deactivate()
except RuntimeError:
# Is raised if an already deactivated participant is
# deactivated again
pass
def _handle_iu_events(self, event):
'''Dispatch incoming IU events.
......@@ -286,7 +313,7 @@ class InputBuffer(Buffer):
event -- a converted RSB event
'''
type_ = type(event.data)
if type_ is ipaaca.iu.RemotePushIU:
if type_ == ipaaca.iu.RemotePushIU:
# a new IU
if event.data.uid not in self._iu_store:
self._iu_store[event.data.uid] = event.data
......@@ -299,7 +326,7 @@ class InputBuffer(Buffer):
# done via the resend request mechanism).
self._iu_store[event.data.uid] = event.data
event.data.buffer = self
elif type_ is ipaaca.iu.RemoteMessage:
elif type_ == ipaaca.iu.RemoteMessage:
# a new Message, an ephemeral IU that is removed after calling handlers
self._iu_store[ event.data.uid ] = event.data
event.data.buffer = self
......@@ -308,7 +335,7 @@ class InputBuffer(Buffer):
else:
if event.data.uid not in self._iu_store:
if (self._resend_active and
not type_ is ipaaca_pb2.IURetraction):
not type_ == ipaaca.ipaaca_pb2.IURetraction):
# send resend request to remote server, IURetraction is ignored
try:
self._request_remote_resend(event)
......@@ -319,7 +346,7 @@ class InputBuffer(Buffer):
LOGGER.warning("Received an update for an IU which we did not receive before.")
return
# an update to an existing IU
if type_ is ipaaca_pb2.IURetraction:
if type_ == ipaaca.ipaaca_pb2.IURetraction:
# IU retraction (cannot be triggered remotely)
iu = self._iu_store[event.data.uid]
iu._revision = event.data.revision
......@@ -330,18 +357,18 @@ class InputBuffer(Buffer):
# Notify only for remotely triggered events;
# Discard updates that originate from this buffer
return
if type_ is ipaaca_pb2.IUCommission:
if type_ == ipaaca.ipaaca_pb2.IUCommission:
# IU commit
iu = self._iu_store[event.data.uid]
iu._apply_commission()
iu._revision = event.data.revision
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.COMMITTED, category=iu.category)
elif type_ is ipaaca.converter.IUPayloadUpdate:
elif type_ == ipaaca.converter.IUPayloadUpdate:
# IU payload update
iu = self._iu_store[event.data.uid]
iu._apply_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.UPDATED, category=iu.category)
elif type_ is ipaaca.converter.IULinkUpdate:
elif type_ == ipaaca.converter.IULinkUpdate:
# IU link update
iu = self._iu_store[event.data.uid]
iu._apply_link_update(event.data)
......@@ -350,14 +377,14 @@ class InputBuffer(Buffer):
LOGGER.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_))
def add_category_interests(self, category_interests):
if hasattr(category_interests, '__iter__'):
if not isinstance(category_interests, six.string_types) and hasattr(category_interests, '__iter__'):
for interest in category_interests:
self._add_category_listener(interest)
else:
self._add_category_listener(category_interests)
def remove_category_interests(self, category_interests):
if hasattr(category_interests, '__iter__'):
if not isinstance(category_interests, six.string_types) and hasattr(category_interests, '__iter__'):
for interest in category_interests:
self._remove_category_listener(interest)
else:
......@@ -366,15 +393,15 @@ class InputBuffer(Buffer):
def _request_remote_resend(self, event):
remote_server = self._get_remote_server(event)
if remote_server:
resend_request = ipaaca_pb2.IUResendRequest()
resend_request = ipaaca.ipaaca_pb2.IUResendRequest()
resend_request.uid = event.data.uid # target iu
resend_request.hidden_scope_name = str(self._uuid) # hidden category name
remote_revision = remote_server.requestResend(resend_request)
remote_revision = remote_server.resendRequest(resend_request)
if remote_revision == 0:
raise ipaaca.exception.IUResendRequestFailedError()
raise ipaaca.exception.IUResendRequestFailedError(event.data.uid)
else:
# Remote server is not known
raise ipaaca.exception.IUResendRequestFailedError()
raise ipaaca.exception.IUResendRequestRemoteServerUnknownError(event.data.uid)
def register_handler(self, handler_function, for_event_types=None, for_categories=None):
"""Register a new IU event handler function.
......@@ -416,11 +443,8 @@ class OutputBuffer(Buffer):
'''
super(OutputBuffer, self).__init__(owning_component_name, channel, participant_config)
self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB'
self._server = rsb.createLocalServer(rsb.Scope(self._unique_name))
self._server.addMethod('updateLinks', self._remote_update_links, ipaaca.converter.IULinkUpdate, int)
self._server.addMethod('updatePayload', self._remote_update_payload, ipaaca.converter.IUPayloadUpdate, int)
self._server.addMethod('commit', self._remote_commit, ipaaca_pb2.IUCommission, int)
self._server.addMethod('requestResend', self._remote_request_resend, ipaaca_pb2.IUResendRequest, int)
be = ipaaca.backend.get_default_backend()
self._server = be.createLocalServer(self, be.Scope(self._unique_name + '/Server'), config=self._participant_config)
self._informer_store = {}
self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-'
self.__iu_id_counter_lock = threading.Lock()
......@@ -428,12 +452,15 @@ class OutputBuffer(Buffer):
def _teardown(self):
'''OutputBuffer retracts remaining live IUs on teardown'''
self._retract_all_internal()
self._deactivate_all_internal()
def __del__(self):
'''Perform teardown (IU retractions) as soon as Buffer is lost.
Note that at program exit the teardown might be called
twice for live objects (atexit, then del), but the
_retract_all_internal method prevents double retractions.'''
self._retract_all_internal()
self._deactivate_all_internal()
def _remote_update_links(self, update):
'''Apply a remotely requested update to one of the stored IU's links.'''
......@@ -462,10 +489,10 @@ class OutputBuffer(Buffer):
with iu.revision_lock:
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
LOGGER.warning(u"Remote update_payload operation failed because request was out of date; IU "+str(update.uid))
LOGGER.warning(u" Writer was: "+update.writer_name)
LOGGER.warning(u" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove))
LOGGER.warning(u" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision))
LOGGER.warning("Remote update_payload operation failed because request was out of date; IU "+str(update.uid))
LOGGER.warning(" Writer was: "+update.writer_name)
LOGGER.warning(" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove))
LOGGER.warning(" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision))
return 0
if update.is_delta:
#print('Writing delta update by '+str(update.writer_name))
......@@ -488,7 +515,7 @@ class OutputBuffer(Buffer):
return 0
iu = self._iu_store[iu_resend_request_pack.uid]
with iu.revision_lock:
if iu_resend_request_pack.hidden_scope_name is not None and iu_resend_request_pack.hidden_scope_name is not '':
if iu_resend_request_pack.hidden_scope_name is not None and iu_resend_request_pack.hidden_scope_name != '':
informer = self._get_informer(iu_resend_request_pack.hidden_scope_name)
informer.publishData(iu)
return iu.revision
......@@ -518,8 +545,9 @@ class OutputBuffer(Buffer):
if iu_category in self._informer_store:
LOGGER.info("Returning informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category))
return self._informer_store[iu_category]
informer_iu = rsb.createInformer(
rsb.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)),
be = ipaaca.backend.get_default_backend()
informer_iu = be.createInformer(
be.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)),
config=self._participant_config,
dataType=object)
self._informer_store[iu_category] = informer_iu #new_tuple
......@@ -562,7 +590,7 @@ class OutputBuffer(Buffer):
def _retract_iu(self, iu):
'''Retract an IU.'''
iu._retracted = True
iu_retraction = ipaaca_pb2.IURetraction()
iu_retraction = ipaaca.ipaaca_pb2.IURetraction()
iu_retraction.uid = iu.uid
iu_retraction.revision = iu.revision
informer = self._get_informer(iu._category)
......@@ -574,6 +602,22 @@ class OutputBuffer(Buffer):
if not iu._retracted:
self._retract_iu(iu)
def _deactivate_all_internal(self):
'''Deactivate all participants.'''
try:
self._server.deactivate()
except RuntimeError:
# Is raised if an already deactivated participant is
# deactivated again
pass
for informer in self._informer_store.values():
try:
informer.deactivate()
except RuntimeError:
# Is raised if an already deactivated participant is
# deactivated again
pass
def _send_iu_commission(self, iu, writer_name):
'''Send IU commission.
......@@ -585,7 +629,7 @@ class OutputBuffer(Buffer):
'''
# a raw Protobuf object for IUCommission is produced
# (unlike updates, where we have an intermediate class)
iu_commission = ipaaca_pb2.IUCommission()
iu_commission = ipaaca.ipaaca_pb2.IUCommission()
iu_commission.uid = iu.uid
iu_commission.revision = iu.revision
iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name
......
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import os
import re
try:
import configparser
except:
import ConfigParser as configparser
LOGGER = ipaaca.misc.get_library_logger()
__global_config = None
class Config(object):
def __init__(self):
self._store = {}
def get_with_default(self, key, default_value, warn=False):
if key in self._store:
return self._store[key]
else:
notif = LOGGER.warning if warn else LOGGER.debug
notif('Config key '+str(key)+' not found, returning default of '+str(default_value))
return default_value
def populate_from_global_sources(self):
self._store = {}
self.populate_from_any_conf_files()
self.populate_from_environment()
#self.populate_from_argv_overrides() # TODO IMPLEMENT_ME
def populate_from_any_conf_files(self):
globalconf = os.getenv('HOME', '')+'/.config/ipaaca.conf'
for filename in ['ipaaca.conf', globalconf]:
try:
f = open(filename, 'r')
c = configparser.ConfigParser()
c.readfp(f)
f.close()
LOGGER.info('Including configuration from '+filename)
for k,v in c.items('ipaaca'):
self._store[k] = v
return
except:
pass
LOGGER.info('Could not load ipaaca.conf either here or in ~/.config')
def populate_from_environment(self):
for k, v in os.environ.items():
if k.startswith('IPAACA_'):
if re.match(r'^[A-Za-z0-9_]*$', k) is None:
LOGGER.warning('Ignoring malformed environment key')
else:
if len(v)>1023:
LOGGER.warning('Ignoring long environment value')
else:
# remove initial IPAACA_ and transform key to dotted lowercase
trans_key = k[7:].lower().replace('_', '.')
self._store[trans_key] = v
LOGGER.debug('Configured from environment: '+str(trans_key)+'="'+str(v)+'"')
def get_global_config():
global __global_config
if __global_config is None:
__global_config = Config()
__global_config.populate_from_global_sources()
return __global_config
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -34,9 +34,9 @@ from __future__ import division, print_function
import collections
import rsb.converter
#import rsb.converter
import ipaaca_pb2
import ipaaca.ipaaca_pb2
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
......@@ -45,230 +45,364 @@ import ipaaca.misc
LOGGER = ipaaca.misc.get_library_logger()
try:
import simplejson as json
import simplejson as json
except ImportError:
import json
LOGGER.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.')
import json
LOGGER.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.')
__all__ = [
'IntConverter',
'IUConverter',
'IULinkUpdate',
'IULinkUpdateConverter',
'IUPayloadUpdate',
'IUPayloadUpdateConverter',
'MessageConverter',
]
class IntConverter(rsb.converter.Converter):
"""Convert Python int objects to Protobuf ints and vice versa."""
def __init__(self, wireSchema="int", dataType=int):
super(IntConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, value):
pbo = ipaaca_pb2.IntMessage()
pbo.value = value
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca_pb2.IntMessage()
pbo.ParseFromString( str(byte_stream) )
return pbo.value
def pack_payload_entry(entry, key, value, _type=ipaaca.iu.IUPayloadType.JSON):
entry.key = key
if _type == ipaaca.iu.IUPayloadType.JSON:
entry.value = json.dumps(value)
elif _type == ipaaca.iu.IUPayloadType.STR or _type == 'MAP':
entry.value = str(value)
else:
raise ipaaca.exception.IpaacaException('Asked to send payload entry with unsupported type "' + _type + '".')
entry.type = _type
'IntConverter',
'IUConverter',
'IULinkUpdate',
'IULinkUpdateConverter',
'IUPayloadUpdate',
'IUPayloadUpdateConverter',
'MessageConverter',
'register_global_converter',
]
_LOW_LEVEL_WIRE_SCHEMA_MAP = None
def LOW_LEVEL_WIRE_SCHEMA_FOR(abstractname):
'''Map the abstract wire schema name (was used in RSB) to a
transport-dependent magic to detect on the wire.
Here: a required protobuf field'''
global _LOW_LEVEL_WIRE_SCHEMA_MAP
if _LOW_LEVEL_WIRE_SCHEMA_MAP is None:
_LOW_LEVEL_WIRE_SCHEMA_MAP = {
int: ipaaca.ipaaca_pb2.WireTypeIntMessage,
ipaaca.iu.IU: ipaaca.ipaaca_pb2.WireTypeIU,
ipaaca.iu.Message: ipaaca.ipaaca_pb2.WireTypeMessageIU,
IUPayloadUpdate: ipaaca.ipaaca_pb2.WireTypeIUPayloadUpdate,
IULinkUpdate: ipaaca.ipaaca_pb2.WireTypeIULinkUpdate,
'int': ipaaca.ipaaca_pb2.WireTypeIntMessage,
'ipaaca-iu': ipaaca.ipaaca_pb2.WireTypeIU,
'ipaaca-messageiu': ipaaca.ipaaca_pb2.WireTypeMessageIU,
'ipaaca-iu-payload-update': ipaaca.ipaaca_pb2.WireTypeIUPayloadUpdate,
'ipaaca-iu-link-update': ipaaca.ipaaca_pb2.WireTypeIULinkUpdate,
}
return _LOW_LEVEL_WIRE_SCHEMA_MAP.get(abstractname)
def __fail_no_type_converter():
raise ipaaca.exception.BackendSerializationError()
class FailingDict(dict):
def __init__(self, error_class, *args, **kwargs):
super(FailingDict, self).__init__(*args, **kwargs)
self._error_class = error_class
def __getitem__(self, k):
if k in self:
return dict.__getitem__(self, k)
else:
raise self._error_class(k)
# global converter / [un]marshaller store
__converter_registry_by_type = FailingDict(ipaaca.exception.BackendSerializationError)
__converter_registry_by_wire_schema = FailingDict(ipaaca.exception.BackendDeserializationError)
def register_global_converter(converter):
global __converter_registry_by_type, __converter_registry_by_wire_schema
real_wire_schema = LOW_LEVEL_WIRE_SCHEMA_FOR(converter._wire_schema)
if real_wire_schema is None:
raise NotImplementedError('There is no entry in the _LOW_LEVEL_WIRE_SCHEMA_MAP for '+str(converter._wire_schema))
if real_wire_schema in __converter_registry_by_wire_schema:
raise ipaaca.exception.ConverterRegistrationError(real_wire_schema)
if converter._data_type in __converter_registry_by_type:
raise ipaaca.exception.ConverterRegistrationError(converter._data_type.__name__)
__converter_registry_by_type[converter._data_type] = converter
__converter_registry_by_wire_schema[real_wire_schema] = converter
def deserialize(lowlevel_message):
pbo_outer = ipaaca.ipaaca_pb2.TransportLevelWrapper()
pbo_outer.ParseFromString(lowlevel_message)
type_ = pbo_outer.transport_message_type
#print('Received wire message type', type_)
if type_ in __converter_registry_by_wire_schema:
return __converter_registry_by_wire_schema[type_].deserialize(pbo_outer.raw_message, None)
else:
pbo = None
if type_ == ipaaca.ipaaca_pb2.WireTypeRemoteRequestResult:
pbo = ipaaca.ipaaca_pb2.RemoteRequestResult()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIURetraction:
pbo = ipaaca.ipaaca_pb2.IURetraction()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIUCommission:
pbo = ipaaca.ipaaca_pb2.IUCommission()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIUResendRequest:
pbo = ipaaca.ipaaca_pb2.IUResendRequest()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIUPayloadUpdateRequest:
pbo = ipaaca.ipaaca_pb2.IUPayloadUpdateRequest()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIUCommissionRequest:
pbo = ipaaca.ipaaca_pb2.IUCommissionRequest()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIULinkUpdateRequest:
pbo = ipaaca.ipaaca_pb2.IULinkUpdateRequest()
if pbo is None:
raise ipaaca.exception.BackendDeserializationError(type_)
else:
pbo.ParseFromString(pbo_outer.raw_message)
return pbo
raise ipaaca.exception.BackendDeserializationError(type_)
def serialize(obj):
inner, type_ = None, None
if obj.__class__ in __converter_registry_by_type:
cls_ = obj.__class__
inner, wire = __converter_registry_by_type[obj.__class__].serialize(obj)
type_ = LOW_LEVEL_WIRE_SCHEMA_FOR(wire)
else:
cls_ = obj.__class__
if cls_ == ipaaca.ipaaca_pb2.RemoteRequestResult:
type_ = ipaaca.ipaaca_pb2.WireTypeRemoteRequestResult
elif cls_ == ipaaca.ipaaca_pb2.IURetraction:
type_ = ipaaca.ipaaca_pb2.WireTypeIURetraction
elif cls_ == ipaaca.ipaaca_pb2.IUCommission:
type_ = ipaaca.ipaaca_pb2.WireTypeIUCommission
elif cls_ == ipaaca.ipaaca_pb2.IUResendRequest:
type_ = ipaaca.ipaaca_pb2.WireTypeIUResendRequest
elif cls_ == ipaaca.ipaaca_pb2.IUPayloadUpdateRequest:
type_ = ipaaca.ipaaca_pb2.WireTypeIUPayloadUpdateRequest
elif cls_ == ipaaca.ipaaca_pb2.IUCommissionRequest:
type_ = ipaaca.ipaaca_pb2.WireTypeIUCommissionRequest
elif cls_ == ipaaca.ipaaca_pb2.IULinkUpdateRequest:
type_ = ipaaca.ipaaca_pb2.WireTypeIULinkUpdateRequest
if type_ is None:
raise ipaaca.exception.BackendSerializationError(cls_)
else:
inner = obj.SerializeToString()
pbo = ipaaca.ipaaca_pb2.TransportLevelWrapper()
pbo.transport_message_type = type_
pbo.raw_message = inner
return bytearray(pbo.SerializeToString())
class ConverterBase(object):
'''Base for converters (to serialize and unserialize
data automatically depending on its Python type).'''
def __init__(self, substrate, data_type, wire_schema):
self._substrate = substrate
self._wire_schema = wire_schema
self._data_type = data_type
self.wireSchema = wire_schema # added compat with RSB
#print('Made a ConverterBase with wire '+str(self._wire_schema)+' and data '+str(self._data_type))
def serialize(self, value):
raise NotImplementedError('NOT IMPLEMENTED for ' \
+ self.__class__.__name__+': serialize')
def deserialize(self, stream, _UNUSED_override_wire_schema):
raise NotImplementedError('NOT IMPLEMENTED for ' \
+ self.__class__.__name__+': deserialize')
class IntConverter(ConverterBase):
"""Convert Python int objects to Protobuf ints and vice versa."""
def __init__(self, wireSchema="int", dataType=None):
super(IntConverter, self).__init__(bytearray, int, wireSchema)
def serialize(self, value):
pbo = ipaaca.ipaaca_pb2.IntMessage()
pbo.value = value
return pbo.SerializeToString(), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IntMessage()
pbo.ParseFromString(byte_stream)
return pbo.value
def pack_payload_entry(entry, key, value, _type=None):
#if _type is None: _type=ipaaca.iu.IUPayloadType.JSON
entry.key = key
if _type is None or _type == ipaaca.iu.IUPayloadType.JSON:
entry.value = json.dumps(value)
elif _type == ipaaca.iu.IUPayloadType.STR or _type == 'MAP':
entry.value = str(value)
else:
raise ipaaca.exception.IpaacaException('Asked to send payload entry with unsupported type "' + _type + '".')
entry.type = _type
def unpack_payload_entry(entry):
# We assume that the only transfer types are 'STR' or 'JSON'. Both are transparently handled by json.loads
if entry.type == ipaaca.iu.IUPayloadType.JSON:
return json.loads(entry.value)
elif entry.type == ipaaca.iu.IUPayloadType.STR or entry.type == 'str':
return entry.value
else:
LOGGER.warn('Received payload entry with unsupported type "' + entry.type + '".')
return entry.value
class IUConverter(rsb.converter.Converter):
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-iu", dataType=ipaaca.iu.IU):
super(IUConverter, self).__init__(bytearray, dataType, wireSchema)
self._access_mode = ipaaca_pb2.IU.PUSH
self._remote_data_type = ipaaca.iu.RemotePushIU
def serialize(self, iu):
pbo = ipaaca_pb2.IU()
pbo.access_mode = self._access_mode
pbo.uid = iu._uid
pbo.revision = iu._revision
pbo.category = iu._category
pbo.payload_type = iu._payload_type
pbo.owner_name = iu._owner_name
pbo.committed = iu._committed
pbo.read_only = iu._read_only
for k, v in iu._payload.iteritems():
entry = pbo.payload.add()
pack_payload_entry(entry, k, v, iu.payload_type)
for type_ in iu._links.keys():
linkset = pbo.links.add()
linkset.type = type_
linkset.targets.extend(iu._links[type_])
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca_pb2.IU()
pbo.ParseFromString(str(byte_stream))
_payload = {}
for entry in pbo.payload:
_payload[entry.key] = unpack_payload_entry(entry)
_links = collections.defaultdict(set)
for linkset in pbo.links:
for target_uid in linkset.targets:
_links[linkset.type].add(target_uid)
return self._remote_data_type(
uid=pbo.uid,
revision=pbo.revision,
read_only = pbo.read_only,
owner_name = pbo.owner_name,
category = pbo.category,
payload_type = 'str' if pbo.payload_type is 'MAP' else pbo.payload_type,
committed = pbo.committed,
payload=_payload,
links=_links)
# We assume that the only transfer types are 'STR' or 'JSON'. Both are transparently handled by json.loads
if entry.type == ipaaca.iu.IUPayloadType.JSON:
return json.loads(entry.value)
elif entry.type == ipaaca.iu.IUPayloadType.STR or entry.type == 'str':
return entry.value
else:
LOGGER.warn('Received payload entry with unsupported type "' + entry.type + '".')
return entry.value
class IUConverter(ConverterBase):
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-iu", dataType=None): #ipaaca.iu.IU):
super(IUConverter, self).__init__(bytearray, ipaaca.iu.IU if dataType is None else dataType, wireSchema)
self._access_mode = ipaaca.ipaaca_pb2.IU.PUSH
self._remote_data_type = ipaaca.iu.RemotePushIU
def serialize(self, iu):
pbo = ipaaca.ipaaca_pb2.IU()
pbo.access_mode = self._access_mode
pbo.uid = iu._uid
pbo.revision = iu._revision
pbo.category = iu._category
pbo.payload_type = iu._payload_type
pbo.owner_name = iu._owner_name
pbo.committed = iu._committed
pbo.read_only = iu._read_only
for k, v in iu._payload.items():
entry = pbo.payload.add()
pack_payload_entry(entry, k, v, iu.payload_type)
for type_ in iu._links.keys():
linkset = pbo.links.add()
linkset.type = type_
linkset.targets.extend(iu._links[type_])
return pbo.SerializeToString(), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IU()
pbo.ParseFromString(byte_stream)
_payload = {}
for entry in pbo.payload:
_payload[entry.key] = unpack_payload_entry(entry)
_links = collections.defaultdict(set)
for linkset in pbo.links:
for target_uid in linkset.targets:
_links[linkset.type].add(target_uid)
return self._remote_data_type(
uid=pbo.uid,
revision=pbo.revision,
read_only = pbo.read_only,
owner_name = pbo.owner_name,
category = pbo.category,
payload_type = 'str' if pbo.payload_type == 'MAP' else pbo.payload_type,
committed = pbo.committed,
payload=_payload,
links=_links)
class MessageConverter(IUConverter):
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-messageiu", dataType=ipaaca.iu.Message):
super(MessageConverter, self).__init__(wireSchema, dataType)
self._access_mode = ipaaca_pb2.IU.MESSAGE
self._remote_data_type = ipaaca.iu.RemoteMessage
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-messageiu", dataType=None): #ipaaca.iu.Message):
super(MessageConverter, self).__init__(wireSchema, ipaaca.iu.Message)
self._access_mode = ipaaca.ipaaca_pb2.IU.MESSAGE
self._remote_data_type = ipaaca.iu.RemoteMessage
class IULinkUpdate(object):
def __init__(self, uid, revision, is_delta, writer_name="undef", new_links=None, links_to_remove=None):
super(IULinkUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.writer_name = writer_name
self.is_delta = is_delta
self.new_links = collections.defaultdict(set) if new_links is None else collections.defaultdict(set, new_links)
self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove)
def __str__(self):
s = 'LinkUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_links = '+str(self.new_links)+', '
s += 'links_to_remove = '+str(self.links_to_remove)+')'
return s
class IULinkUpdateConverter(rsb.converter.Converter):
def __init__(self, wireSchema="ipaaca-iu-link-update", dataType=IULinkUpdate):
super(IULinkUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu_link_update):
pbo = ipaaca_pb2.IULinkUpdate()
pbo.uid = iu_link_update.uid
pbo.writer_name = iu_link_update.writer_name
pbo.revision = iu_link_update.revision
for type_ in iu_link_update.new_links.keys():
linkset = pbo.new_links.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.new_links[type_])
for type_ in iu_link_update.links_to_remove.keys():
linkset = pbo.links_to_remove.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.links_to_remove[type_])
pbo.is_delta = iu_link_update.is_delta
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
type = self.getDataType()
if type == IULinkUpdate:
pbo = ipaaca_pb2.IULinkUpdate()
pbo.ParseFromString( str(byte_stream) )
LOGGER.debug('received an IULinkUpdate for revision '+str(pbo.revision))
iu_link_up = IULinkUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_links:
iu_link_up.new_links[str(entry.type)] = set(entry.targets)
for entry in pbo.links_to_remove:
iu_link_up.links_to_remove[str(entry.type)] = set(entry.targets)
return iu_link_up
else:
raise ValueError("Inacceptable dataType %s" % type)
def __init__(self, uid, revision, is_delta, writer_name="undef", new_links=None, links_to_remove=None, request_uid=None, request_endpoint=None):
super(IULinkUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.writer_name = writer_name
self.is_delta = is_delta
self.new_links = collections.defaultdict(set) if new_links is None else collections.defaultdict(set, new_links)
self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove)
self.request_uid = request_uid
self.request_endpoint = request_endpoint
def __str__(self):
s = 'LinkUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_links = '+str(self.new_links)+', '
s += 'links_to_remove = '+str(self.links_to_remove)+')'
return s
class IULinkUpdateConverter(ConverterBase):
def __init__(self, wireSchema="ipaaca-iu-link-update", dataType=None): #=IULinkUpdate):
super(IULinkUpdateConverter, self).__init__(bytearray, IULinkUpdate, wireSchema)
def serialize(self, iu_link_update):
pbo = ipaaca.ipaaca_pb2.IULinkUpdate()
pbo.uid = iu_link_update.uid
pbo.writer_name = iu_link_update.writer_name
pbo.revision = iu_link_update.revision
if iu_link_update.request_uid:
pbo.request_uid = iu_link_update.request_uid
if iu_link_update.request_endpoint:
pbo.request_endpoint = iu_link_update.request_endpoint
for type_ in iu_link_update.new_links.keys():
linkset = pbo.new_links.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.new_links[type_])
for type_ in iu_link_update.links_to_remove.keys():
linkset = pbo.links_to_remove.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.links_to_remove[type_])
pbo.is_delta = iu_link_update.is_delta
return pbo.SerializeToString(), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IULinkUpdate()
pbo.ParseFromString(byte_stream)
LOGGER.debug('received an IULinkUpdate for revision '+str(pbo.revision))
iu_link_up = IULinkUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta, request_uid=pbo.request_uid, request_endpoint=pbo.request_endpoint)
for entry in pbo.new_links:
iu_link_up.new_links[str(entry.type)] = set(entry.targets)
for entry in pbo.links_to_remove:
iu_link_up.links_to_remove[str(entry.type)] = set(entry.targets)
return iu_link_up
class IUPayloadUpdate(object):
def __init__(self, uid, revision, is_delta, payload_type, writer_name="undef", new_items=None, keys_to_remove=None):
super(IUPayloadUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.payload_type = payload_type
self.writer_name = writer_name
self.is_delta = is_delta
self.new_items = {} if new_items is None else new_items
self.keys_to_remove = [] if keys_to_remove is None else keys_to_remove
def __str__(self):
s = 'PayloadUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'payload_type='+str(self.payload_type)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_items = '+str(self.new_items)+', '
s += 'keys_to_remove = '+str(self.keys_to_remove)+')'
return s
class IUPayloadUpdateConverter(rsb.converter.Converter):
def __init__(self, wireSchema="ipaaca-iu-payload-update", dataType=IUPayloadUpdate):
super(IUPayloadUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu_payload_update):
pbo = ipaaca_pb2.IUPayloadUpdate()
pbo.uid = iu_payload_update.uid
pbo.writer_name = iu_payload_update.writer_name
pbo.revision = iu_payload_update.revision
for k, v in iu_payload_update.new_items.items():
entry = pbo.new_items.add()
pack_payload_entry(entry, k, v, iu_payload_update.payload_type)
pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove)
pbo.is_delta = iu_payload_update.is_delta
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
type = self.getDataType()
if type == IUPayloadUpdate:
pbo = ipaaca_pb2.IUPayloadUpdate()
pbo.ParseFromString( str(byte_stream) )
LOGGER.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, payload_type=None, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_items:
iu_up.new_items[entry.key] = unpack_payload_entry(entry)
iu_up.keys_to_remove = pbo.keys_to_remove[:]
return iu_up
else:
raise ValueError("Inacceptable dataType %s" % type)
def __init__(self, uid, revision, is_delta, payload_type, writer_name="undef", new_items=None, keys_to_remove=None, request_uid=None, request_endpoint=None):
super(IUPayloadUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.payload_type = payload_type
self.writer_name = writer_name
self.is_delta = is_delta
self.new_items = {} if new_items is None else new_items
self.keys_to_remove = [] if keys_to_remove is None else keys_to_remove
self.request_uid = request_uid
self.request_endpoint = request_endpoint
def __str__(self):
s = 'PayloadUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'payload_type='+str(self.payload_type)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_items = '+str(self.new_items)+', '
s += 'keys_to_remove = '+str(self.keys_to_remove)+')'
return s
class IUPayloadUpdateConverter(ConverterBase):
def __init__(self, wireSchema="ipaaca-iu-payload-update", dataType=None):
super(IUPayloadUpdateConverter, self).__init__(bytearray, IUPayloadUpdate, wireSchema)
def serialize(self, iu_payload_update):
pbo = ipaaca.ipaaca_pb2.IUPayloadUpdate()
pbo.uid = iu_payload_update.uid
pbo.writer_name = iu_payload_update.writer_name
pbo.revision = iu_payload_update.revision
if iu_payload_update.request_uid:
pbo.request_uid = iu_payload_update.request_uid
if iu_payload_update.request_endpoint:
pbo.request_endpoint = iu_payload_update.request_endpoint
for k, v in iu_payload_update.new_items.items():
entry = pbo.new_items.add()
pack_payload_entry(entry, k, v, iu_payload_update.payload_type)
pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove)
pbo.is_delta = iu_payload_update.is_delta
return pbo.SerializeToString(), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IUPayloadUpdate()
pbo.ParseFromString(byte_stream)
LOGGER.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, payload_type=None, writer_name=pbo.writer_name, is_delta=pbo.is_delta, request_uid=pbo.request_uid, request_endpoint=pbo.request_endpoint)
for entry in pbo.new_items:
iu_up.new_items[entry.key] = unpack_payload_entry(entry)
iu_up.keys_to_remove = pbo.keys_to_remove[:]
return iu_up
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -33,6 +33,15 @@
IPAACA_DEFAULT_CHANNEL = 'default'
IPAACA_LOGGER_NAME = 'ipaaca'
IPAACA_DEFAULT_LOGGING_LEVEL = 'WARNING'
IPAACA_DEFAULT_IU_PAYLOAD_TYPE = 'JSON' # one of ipaaca.iu.IUPayloadType
IPAACA_DEFAULT_RSB_HOST = None
IPAACA_DEFAULT_RSB_PORT = None
IPAACA_DEFAULT_RSB_TRANSPORT = None
IPAACA_DEFAULT_RSB_SOCKET_SERVER = None
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2015 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -48,6 +48,35 @@ __all__ = [
class IpaacaError(Exception): pass
class BackendInitializationError(IpaacaError):
"""Error indicating that type marshalling cannot proceed
because no matching converter is known."""
def __init__(self, name=''):
super(BackendInitializationError, self).__init__( \
'Failed to initialize selected backend '+str(name))
class BackendSerializationError(IpaacaError):
"""Error indicating that type marshalling cannot proceed
because no matching converter is known."""
def __init__(self, typ):
super(BackendSerializationError, self).__init__( \
'Could not serialize type ' + str(typ.__name__) \
+ ' - no converter registered.')
class BackendDeserializationError(IpaacaError):
"""Error indicating that type unmarshalling cannot proceed
because no matching converter is known."""
def __init__(self, wire_schema):
super(BackendDeserializationError, self).__init__( \
'Could not deserialize wire format "' + str(wire_schema) \
+ '" - no converter registered.')
class ConverterRegistrationError(IpaacaError):
'''Error indicating that a type or wire schema already had a registered converter.'''
def __init__(self, type_name_or_schema):
super(ConverterRegistrationError, self).__init__(
'Failed to register a converter: we already have one for ' \
+ str(type_name_or_schema))
class IUCommittedError(IpaacaError):
"""Error indicating that an IU is immutable because it has been committed to."""
......@@ -88,10 +117,15 @@ class IUReadOnlyError(IpaacaError):
class IUResendRequestFailedError(IpaacaError):
"""Error indicating that a remote IU resend failed."""
def __init__(self, iu):
def __init__(self, iu_uid):
super(IUResendRequestFailedError, self).__init__(
'Remote resend failed for IU ' + str(iu.uid) + '.')
'Remote resend failed for IU ' + str(iu_uid))
class IUResendRequestRemoteServerUnknownError(IpaacaError):
"""Error indicating that a remote IU resend failed."""
def __init__(self, iu_uid):
super(IUResendRequestRemoteServerUnknownError, self).__init__(
'Remote resend request: remote server unknown for IU ' + str(iu_uid))
class IURetractedError(IpaacaError):
"""Error indicating that an IU has been retracted."""
......
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2015 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -37,7 +37,9 @@ import copy
import threading
import uuid
import ipaaca_pb2
import six
import ipaaca.ipaaca_pb2
import ipaaca.converter
import ipaaca.exception
import ipaaca.misc
......@@ -104,18 +106,18 @@ class IUInterface(object):
self._links = collections.defaultdict(set)
def __str__(self):
s = unicode(self.__class__)+"{ "
s = str(self.__class__)+"{ "
s += "category="+("<None>" if self._category is None else self._category)+" "
s += "uid="+self._uid+" "
s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") "
s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " "
s += "payload={ "
for k,v in self.payload.items():
s += k+":'"+unicode(v)+"', "
s += k+":'"+str(v)+"', "
s += "} "
s += "links={ "
for t, ids in self.get_all_links().items():
s += t+":'"+unicode(ids)+"', "
s += t+":'"+str(ids)+"', "
s += "} "
s += "}"
return s
......@@ -135,13 +137,13 @@ class IUInterface(object):
def add_links(self, type, targets, writer_name=None):
'''Attempt to add links if the conditions are met
and send an update message. Then call the local setter.'''
if not hasattr(targets, '__iter__'): targets=[targets]
if isinstance(targets, six.string_types) and hasattr(targets, '__iter__'): targets=[targets]
self._modify_links(is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name)
self._add_and_remove_links( add={type:targets}, remove={} )
def remove_links(self, type, targets, writer_name=None):
'''Attempt to remove links if the conditions are met
and send an update message. Then call the local setter.'''
if not hasattr(targets, '__iter__'): targets=[targets]
if isinstance(targets, six.string_types) and hasattr(targets, '__iter__'): targets=[targets]
self._modify_links(is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name)
self._add_and_remove_links( add={}, remove={type:targets} )
def modify_links(self, add, remove, writer_name=None):
......@@ -555,7 +557,7 @@ class RemotePushIU(IUInterface):
raise ipaaca.exception.IURetractedError(self)
if self._read_only:
raise ipaaca.exception.IUReadOnlyError(self)
commission_request = ipaaca_pb2.IUCommission()
commission_request = ipaaca.ipaaca_pb2.IUCommission()
commission_request.uid = self.uid
commission_request.revision = self.revision
commission_request.writer_name = self.buffer.unique_name
......
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -35,10 +35,10 @@ from __future__ import division, print_function
import argparse
import logging
import sys
import ipaaca.defaults
__all__ = [
'enum',
'IpaacaArgumentParser',
......@@ -54,6 +54,7 @@ def enum(*sequential, **named):
"""
enums = dict(zip(sequential, range(len(sequential))), **named)
enums['_choices'] = enums.keys()
enums['_values'] = enums.values() # RY e.g. see if raw int is valid
return type('Enum', (object,), enums)
......@@ -69,6 +70,21 @@ class IpaacaLoggingHandler(logging.Handler):
msg = str(record.msg.format(record.args))
print(meta + msg)
class RSBLoggingHandler(logging.Handler):
'''A logging handler that prints to stdout, RSB version.'''
def __init__(self, prefix='IPAACA', level=logging.NOTSET):
logging.Handler.__init__(self, level)
self._prefix = prefix
def emit(self, record):
meta = '[%s: %s] ' % (self._prefix, str(record.levelname))
try:
msg = str(record.msg % record.args)
except:
msg = str(record.msg) + ' WITH ARGS: ' + str(record.args)
print(meta + msg)
class GenericNoLoggingHandler(logging.Handler):
'''A logging handler that produces no output'''
......@@ -80,21 +96,21 @@ def get_library_logger():
return logging.getLogger(ipaaca.defaults.IPAACA_LOGGER_NAME)
__IPAACA_LOGGING_HANDLER = IpaacaLoggingHandler('IPAACA')
__GENERIC_NO_LOG_HANDLER = GenericNoLoggingHandler()
_IPAACA_LOGGING_HANDLER = IpaacaLoggingHandler('IPAACA')
_GENERIC_NO_LOG_HANDLER = GenericNoLoggingHandler()
# By default, suppress library logging
# - for IPAACA
get_library_logger().addHandler(__GENERIC_NO_LOG_HANDLER)
get_library_logger().addHandler(_GENERIC_NO_LOG_HANDLER)
# - for RSB
logging.getLogger('rsb').addHandler(__GENERIC_NO_LOG_HANDLER)
logging.getLogger('rsb').addHandler(_GENERIC_NO_LOG_HANDLER)
def enable_logging(level=None):
'''Enable ipaaca's 'library-wide logging.'''
ipaaca_logger = get_library_logger()
ipaaca_logger.addHandler(__IPAACA_LOGGING_HANDLER)
ipaaca_logger.removeHandler(__GENERIC_NO_LOG_HANDLER)
ipaaca_logger.addHandler(_IPAACA_LOGGING_HANDLER)
ipaaca_logger.removeHandler(_GENERIC_NO_LOG_HANDLER)
ipaaca_logger.setLevel(level=level if level is not None else
ipaaca.defaults.IPAACA_DEFAULT_LOGGING_LEVEL)
......@@ -120,10 +136,30 @@ class IpaacaArgumentParser(argparse.ArgumentParser):
def __call__(self, parser, namespace, values, option_string=None):
rsb_logger = logging.getLogger('rsb')
rsb_logger.addHandler(IpaacaLoggingHandler('RSB'))
rsb_logger.removeHandler(__GENERIC_NO_LOG_HANDLER)
rsb_logger.addHandler(RSBLoggingHandler('RSB'))
rsb_logger.removeHandler(_GENERIC_NO_LOG_HANDLER)
rsb_logger.setLevel(level=values)
class IpaacaRSBHost(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST = values
class IpaacaRSBPort(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT = values
class IpaacaRSBTransport(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT = values
class IpaacaRSBSocketServer(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
ipaaca.defaults.IPAACA_DEFAULT_RSB_SOCKET_SERVER = values
def __init__(self, prog=None, usage=None, description=None, epilog=None,
parents=[], formatter_class=argparse.HelpFormatter,
prefix_chars='-', fromfile_prefix_chars=None,
......@@ -136,6 +172,7 @@ class IpaacaArgumentParser(argparse.ArgumentParser):
conflict_handler=conflict_handler, add_help=add_help)
def _add_ipaaca_lib_arguments(self):
# CMD-arguments for ipaaca
ipaacalib_group = self.add_argument_group('IPAACA library arguments')
ipaacalib_group.add_argument(
'--ipaaca-payload-type',
......@@ -157,6 +194,7 @@ class IpaacaArgumentParser(argparse.ArgumentParser):
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'],
dest='_ipaaca_logging_level_',
help="enable IPAACA logging with threshold")
# CMD-arguments for rsb
rsblib_group = self.add_argument_group('RSB library arguments')
rsblib_group.add_argument(
'--rsb-enable-logging',
......@@ -164,6 +202,36 @@ class IpaacaArgumentParser(argparse.ArgumentParser):
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'],
dest='_ipaaca_rsb_enable_logging_',
help="enable RSB logging with threshold")
rsblib_group.add_argument(
'--rsb-host',
action=self.IpaacaRSBHost,
default=None,
dest='_ipaaca_rsb_set_host_',
metavar='HOST',
help="set RSB host")
rsblib_group.add_argument(
'--rsb-port',
action=self.IpaacaRSBPort,
default=None,
dest='_ipaaca_rsb_set_port_',
metavar='PORT',
help="set RSB port")
rsblib_group.add_argument(
'--rsb-transport',
action=self.IpaacaRSBTransport,
default=None,
dest='_ipaaca_rsb_set_transport_',
choices=['spread', 'socket'],
metavar='TRANSPORT',
help="set RSB transport")
rsblib_group.add_argument(
'--rsb-socket-server',
action=self.IpaacaRSBSocketServer,
default=None,
dest='_ipaaca_rsb_set_socket_server_',
choices=['0', '1', 'auto'],
metavar='server',
help="act as server (only when --rsb-transport=socket)")
def parse_args(self, args=None, namespace=None):
self._add_ipaaca_lib_arguments() # Add ipaaca-args just before parsing
......
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -30,7 +30,7 @@
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import threading
import time
......@@ -53,12 +53,12 @@ class Payload(dict):
def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False, update_timeout=_DEFAULT_PAYLOAD_UPDATE_TIMEOUT):
self.iu = iu
_pl = {}
for k, v in ({} if new_payload is None else new_payload).iteritems():
_pl[unicode(k, 'utf8') if type(k) == str else k] = unicode(v, 'utf8') if type(v) == str else v
for k, v in ({} if new_payload is None else new_payload).items():
_pl[str(k) if type(k) == str else k] = str(v) if type(v) == str else v
# NOTE omit_init_update_message is necessary to prevent checking for
# exceptions and sending updates in the case where we just receive
# a whole new payload from the remote side and overwrite it locally.
for k, v in _pl.iteritems():
for k, v in _pl.items():
dict.__setitem__(self, k, v)
if (not omit_init_update_message) and (self.iu.buffer is not None):
self.iu._modify_payload(
......@@ -85,8 +85,8 @@ class Payload(dict):
def __setitem__(self, k, v, writer_name=None):
with self._batch_update_lock:
k = unicode(k, 'utf8') if type(k) == str else k
v = unicode(v, 'utf8') if type(v) == str else v
k = str(k) if type(k) == str else k
v = str(v) if type(v) == str else v
if self._update_on_every_change:
self.iu._modify_payload(
is_delta=True,
......@@ -102,7 +102,7 @@ class Payload(dict):
def __delitem__(self, k, writer_name=None):
with self._batch_update_lock:
k = unicode(k, 'utf8') if type(k) == str else k
k = str(k) if type(k) == str else k
if self._update_on_every_change:
self.iu._modify_payload(
is_delta=True,
......@@ -136,9 +136,9 @@ class Payload(dict):
def merge(self, payload, writer_name=None):
with self._batch_update_lock:
for k, v in payload.iteritems():
k = unicode(k, 'utf8') if type(k) == str else k
v = unicode(v, 'utf8') if type(v) == str else v
for k, v in payload.items():
k = str(k) if type(k) == str else k
v = str(v) if type(v) == str else v
self.iu._modify_payload(
is_delta=True,
new_items=payload,
......@@ -220,18 +220,22 @@ class PayloadItemDictProxy(PayloadItemProxy, dict):
value = self.content.get(key, default)
return self._create_proxy(value, key)
def items(self):
return [(key, value) for key, value in self.iteritems()]
# py3port: were these used at all?
# def items(self):
# return [(key, value) for key, value in self.items()]
def iteritems(self):
for key, value in self.content.iteritems():
# py3port: was iteritems
def items(self):
for key, value in self.content.items():
yield key, self._create_proxy(value, key)
def values(self):
return [value for value in self.itervalues()]
# py3port: were these used at all?
# def values(self):
# return [value for value in self.values()]
def itervalues(self):
for key, value in self.content.iteritems():
# py3port: was itervalues
def values(self):
for key, value in self.content.items():
yield self._create_proxy(value, key)
def pop(self, key, *args):
......
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -32,4 +32,4 @@
from __future__ import division, print_function
from notifier import ComponentNotifier
from .notifier import ComponentNotifier
......@@ -5,7 +5,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2015 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -42,11 +42,15 @@ import traceback
import uuid
import ipaaca
import ipaaca.misc
import six
__all__ = [
'logger_send_ipaaca_logs',
'logger_set_log_filename',
'logger_set_module_name',
'logger_set_log_level',
'LOG_DEBUG',
'LOG_INFO',
'LOG_WARN',
......@@ -54,6 +58,25 @@ __all__ = [
'LOG_ERROR',
]
LogLevel = ipaaca.misc.enum(
DEBUG = 0,
INFO = 1,
WARN = 2,
ERROR = 3,
SILENT = 4,
)
LOG_LEVEL_FROM_STRING_DICT = {
'DEBUG': LogLevel.DEBUG,
'INFO': LogLevel.INFO,
'WARN': LogLevel.WARN,
'WARNING': LogLevel.WARN,
'ERROR': LogLevel.ERROR,
'NONE': LogLevel.SILENT,
'SILENT': LogLevel.SILENT,
}
CURRENT_LOG_LEVEL = LogLevel.DEBUG
LOGGER_LOCK = threading.RLock()
MODULE_NAME = sys.argv[0]
......@@ -97,6 +120,15 @@ def logger_send_ipaaca_logs(flag=True):
with LOGGER_LOCK:
SEND_IPAACA_LOGS = flag
def logger_set_log_level(level=LogLevel.DEBUG):
global CURRENT_LOG_LEVEL
with LOGGER_LOCK:
if level in LogLevel._values:
CURRENT_LOG_LEVEL = level
elif isinstance(level, six.string_types) and level.upper() in LOG_LEVEL_FROM_STRING_DICT:
CURRENT_LOG_LEVEL = LOG_LEVEL_FROM_STRING_DICT[level.upper()]
else:
pass # leave previous setting untouched
def LOG_IPAACA(lvl, text, now=0.0, fn='???', thread='???'):
global OUTPUT_BUFFER
......@@ -113,17 +145,27 @@ def LOG_IPAACA(lvl, text, now=0.0, fn='???', thread='???'):
'thread': thread,
'uuid': uid,
'text': text,}
OUTPUT_BUFFER.add(msg)
try:
OUTPUT_BUFFER.add(msg)
except Exception as e:
LOG_ERROR('Caught an exception while logging via ipaaca. '
+ ' str(e); '
+ traceback.format_exc())
def LOG_CONSOLE(lvlstr, msg, fn_markup='', msg_markup='', now=0.0, fn='???', thread='???'):
for line in msg.split('\n'):
text = lvlstr+' '+thread+' '+fn_markup+fn+''+' '+msg_markup+unicode(line)+''
if isinstance(msg, six.string_types):
lines = msg.split('\n')
else:
lines = [msg]
for line in lines:
text = lvlstr+' '+thread+' '+fn_markup+fn+'\033[m'+' '+msg_markup+str(line)+'\033[m'
print(text)
fn = ' '*len(fn)
def LOG_ERROR(msg, now=None):
if CURRENT_LOG_LEVEL > LogLevel.ERROR: return
now = time.time() if now is None else now
f = sys._getframe(1)
classprefix = (f.f_locals['self'].__class__.__name__+'.') if 'self' in f.f_locals else ''
......@@ -131,10 +173,11 @@ def LOG_ERROR(msg, now=None):
thread = threading.current_thread().getName()
with LOGGER_LOCK:
if SEND_IPAACA_LOGS: LOG_IPAACA('ERROR', msg, now=now, fn=fn, thread=thread)
LOG_CONSOLE('[ERROR]', msg, fn_markup='', msg_markup='', now=now, fn=fn, thread=thread)
LOG_CONSOLE('\033[38;5;9;1;4m[ERROR]\033[m', msg, fn_markup='\033[38;5;203m', msg_markup='\033[38;5;9;1;4m', now=now, fn=fn, thread=thread)
def LOG_WARN(msg, now=None):
if CURRENT_LOG_LEVEL > LogLevel.WARN: return
now = time.time() if now is None else now
f = sys._getframe(1)
classprefix = (f.f_locals['self'].__class__.__name__+'.') if 'self' in f.f_locals else ''
......@@ -142,13 +185,14 @@ def LOG_WARN(msg, now=None):
thread = threading.current_thread().getName()
with LOGGER_LOCK:
if SEND_IPAACA_LOGS: LOG_IPAACA('WARN', msg, now=now, fn=fn, thread=thread)
LOG_CONSOLE('[WARN] ', msg, fn_markup='', msg_markup='', now=now, fn=fn, thread=thread)
LOG_CONSOLE('\033[38;5;208;1m[WARN]\033[m ', msg, fn_markup='\033[38;5;214m', msg_markup='\033[38;5;208;1m', now=now, fn=fn, thread=thread)
LOG_WARNING = LOG_WARN
def LOG_INFO(msg, now=None):
if CURRENT_LOG_LEVEL > LogLevel.INFO: return
now = time.time() if now is None else now
f = sys._getframe(1)
classprefix = (f.f_locals['self'].__class__.__name__+'.') if 'self' in f.f_locals else ''
......@@ -160,6 +204,7 @@ def LOG_INFO(msg, now=None):
def LOG_DEBUG(msg, now=None):
if CURRENT_LOG_LEVEL > LogLevel.DEBUG: return
now = time.time() if now is None else now
f = sys._getframe(1)
classprefix = (f.f_locals['self'].__class__.__name__+'.') if 'self' in f.f_locals else ''
......@@ -167,7 +212,7 @@ def LOG_DEBUG(msg, now=None):
thread = threading.current_thread().getName()
with LOGGER_LOCK:
if SEND_IPAACA_LOGS: LOG_IPAACA('DEBUG', msg, now=now, fn=fn, thread=thread)
LOG_CONSOLE('[DEBUG]', msg, fn_markup='', msg_markup='', now=now, fn=fn, thread=thread)
LOG_CONSOLE('\033[2m[DEBUG]\033[m', msg, fn_markup='\033[38;5;144m', msg_markup='\033[38;5;248m', now=now, fn=fn, thread=thread)
class LoggerComponent(object):
......@@ -209,7 +254,7 @@ class LoggerComponent(object):
self.log_mode == 'timestamp')
new_logfile = open(filename, 'a' if append_if_exist else 'w')
if self.logfile is not None:
text = u'Will now continue logging in log file ' + unicode(filename)
text = u'Will now continue logging in log file ' + str(filename)
uid = str(uuid.uuid4())[0:8]
tim = time.time()
record = {
......@@ -221,7 +266,7 @@ class LoggerComponent(object):
'function': u'LoggerComponent.open_logfile',
'thread': '-',
'logreceivedtime': tim}
self.logfile.write(unicode(record)+'\n')
self.logfile.write(str(record)+'\n')
self.logfile.close()
self.logfile = new_logfile
print('Logging to console and {filename} ...'.format(filename=filename))
......@@ -231,8 +276,8 @@ class LoggerComponent(object):
def close_logfile(self):
if self.logfile is not None:
text = u'Closing of log file requested.'
uid = unicode(uuid.uuid4())[0:8]
tim = unicode(time.time())
uid = str(uuid.uuid4())[0:8]
tim = str(time.time())
record = {
'uuid': uid,
'time': tim,
......@@ -242,7 +287,7 @@ class LoggerComponent(object):
'function': u'LoggerComponent.close_logfile',
'thread': u'-',
'logreceivedtime': tim}
self.logfile.write(unicode(record)+'\n')
self.logfile.write(str(record)+'\n')
self.logfile.close()
print('Closed current log file.')
self.logfile = None
......@@ -280,24 +325,24 @@ class LoggerComponent(object):
'function': function,
'thread': thread,
'logreceivedtime': received_time}
self.logfile.write(unicode(record) + '\n')
self.logfile.write(str(record) + '\n')
except:
print('Failed to write to logfile!')
elif iu.category == 'logcontrol':
cmd = iu.payload['cmd']
cmd = iu.payload['cmd'] if 'cmd' in iu.payload else 'undef'
if cmd == 'open_log_file':
filename = iu.payload['filename'] if 'filename' in iu.payload else ''
if 'existing' in iu.payload:
log_mode_ = iu.payload['existing'].lower()
if log_mode_ not in LOG_MODES:
LOG_WARN(u'Value of "existing" should be "append", timestamp, or "overwrite", continuing with mode {mode}'.format(mode=self.log_mode))
LOG_WARN(u'Value of "existing" should be "append", "timestamp", or "overwrite", continuing with mode {mode}'.format(mode=self.log_mode))
else:
self.log_mode = log_mode_
self.open_logfile(filename)
elif cmd == 'close_log_file':
self.close_logfile()
else:
LOG_WARN(u'Received unknown logcontrol command: '+unicode(cmd))
except Exception, e:
LOG_WARN(u'Received unknown logcontrol command: '+str(cmd))
except Exception as e:
print('Exception while logging!') # TODO write to file as well?
print(u' '+unicode(traceback.format_exc()))
print(u' '+str(traceback.format_exc()))