Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • scs/ipaaca
  • ramin.yaghoubzadeh/ipaaca
2 results
Show changes
Showing
with 3132 additions and 3 deletions
package ipaaca.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.Initializer;
import ipaaca.InputBuffer;
import ipaaca.LocalIU;
import ipaaca.OutputBuffer;
import java.util.Set;
import lombok.Getter;
import org.junit.After;
import org.junit.Test;
import com.google.common.collect.ImmutableSet;
/**
* Integration test for the ComponentNotifier, connects two of them. Requires a running spread daemon.
* @author hvanwelbergen
*
*/
public class ComponentNotifierIntegrationTest
{
private ComponentNotifier notifier1;
private ComponentNotifier notifier2;
private InputBuffer inBuffer;
private OutputBuffer outBuffer;
private static final String OTHER_CATEGORY="OTHER";
static
{
Initializer.initializeIpaacaRsb();
}
private class MyHandlerFunctor implements HandlerFunctor
{
@Getter
private volatile int numCalled = 0;
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
numCalled++;
}
}
@After
public void after()
{
if (inBuffer != null)
{
inBuffer.close();
}
if (outBuffer != null)
{
outBuffer.close();
}
}
private ComponentNotifier setupCompNotifier(String id, Set<String> sendList, Set<String> recvList)
{
inBuffer = new InputBuffer(id + "in", ImmutableSet.of(ComponentNotifier.NOTIFY_CATEGORY));
outBuffer = new OutputBuffer(id + "out");
return new ComponentNotifier(id, "test", ImmutableSet.copyOf(sendList), ImmutableSet.copyOf(recvList), outBuffer, inBuffer);
}
private ComponentNotifier setupCompNotifierWithOtherCategoryInputBuffer(String id, Set<String> sendList, Set<String> recvList)
{
inBuffer = new InputBuffer(id + "in", ImmutableSet.of(ComponentNotifier.NOTIFY_CATEGORY, OTHER_CATEGORY));
outBuffer = new OutputBuffer(id + "out");
return new ComponentNotifier(id, "test", ImmutableSet.copyOf(sendList), ImmutableSet.copyOf(recvList), outBuffer, inBuffer);
}
@Test
public void testSelf() throws InterruptedException
{
notifier1 = setupCompNotifier("not1", ImmutableSet.of("a1", "b1"), ImmutableSet.of("a3", "b1"));
MyHandlerFunctor h1 = new MyHandlerFunctor();
notifier1.addNotificationHandler(h1);
notifier1.initialize();
Thread.sleep(500);
assertEquals(0, h1.getNumCalled());
}
@Test
public void testTwo() throws InterruptedException
{
notifier1 = setupCompNotifier("not1", ImmutableSet.of("a1", "b1"), ImmutableSet.of("a3", "b2"));
notifier2 = setupCompNotifier("not2", ImmutableSet.of("a2", "b2"), ImmutableSet.of("a3", "b1"));
MyHandlerFunctor h1 = new MyHandlerFunctor();
MyHandlerFunctor h2 = new MyHandlerFunctor();
notifier1.addNotificationHandler(h1);
notifier2.addNotificationHandler(h2);
notifier1.initialize();
Thread.sleep(500);
notifier2.initialize();
Thread.sleep(500);
assertEquals(1, h1.getNumCalled());
assertEquals(1, h2.getNumCalled());
}
@Test
public void testOtherCategoryInInputBuffer() throws InterruptedException
{
notifier1 = setupCompNotifierWithOtherCategoryInputBuffer("not1", ImmutableSet.of("a1", "b1"), ImmutableSet.of("a3", "b1"));
MyHandlerFunctor h1 = new MyHandlerFunctor();
notifier1.addNotificationHandler(h1);
OutputBuffer out = new OutputBuffer("out");
LocalIU iu = new LocalIU(OTHER_CATEGORY);
out.add(iu);
Thread.sleep(500);
assertEquals(0, h1.getNumCalled());
assertNotNull(inBuffer.getIU(iu.getUid()));
}
}
package ipaaca.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.doAnswer;
import ipaaca.AbstractIU;
import ipaaca.IUEventHandler;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
import ipaaca.LocalIU;
import ipaaca.OutputBuffer;
import ipaaca.Payload;
import java.util.Set;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
/**
* Unit tests for the ComponentNotifier
* @author hvanwelbergen
*/
public class ComponentNotifierTest
{
private static final ImmutableSet<String> RECV_CAT = ImmutableSet.of("testrec1", "testrc2");
private static final ImmutableSet<String> SEND_CAT = ImmutableSet.of("testsnd1", "testsnd2", "testsnd3");
private OutputBuffer mockOutBuffer = mock(OutputBuffer.class);
private InputBuffer mockInBuffer = mock(InputBuffer.class);
private IUEventHandler inputHandler;
private ComponentNotifier notifier = new ComponentNotifier("testcomp", "testfunc", SEND_CAT, RECV_CAT, mockOutBuffer, mockInBuffer);
@Before
public void setup()
{
doAnswer(new Answer<Void>()
{
@Override
public Void answer(InvocationOnMock invocation) throws Throwable
{
IUEventHandler handler = (IUEventHandler) (invocation.getArguments()[0]);
inputHandler = handler;
return null;
}
}).when(mockInBuffer).registerHandler(any(IUEventHandler.class));
notifier.initialize();
}
@Test
public void testNotifyAtInit()
{
ArgumentCaptor<LocalIU> argument = ArgumentCaptor.forClass(LocalIU.class);
verify(mockOutBuffer).add(argument.capture());
LocalIU iu = argument.getValue();
assertEquals(ComponentNotifier.NOTIFY_CATEGORY, iu.getCategory());
assertEquals("new", iu.getPayload().get(ComponentNotifier.STATE));
assertThat(ImmutableSet.copyOf(iu.getPayload().get(ComponentNotifier.RECEIVE_CATEGORIES).split(",")),
IsIterableContainingInAnyOrder.containsInAnyOrder(RECV_CAT.toArray(new String[0])));
assertThat(ImmutableSet.copyOf(iu.getPayload().get(ComponentNotifier.SEND_CATEGORIES).split(",")),
IsIterableContainingInAnyOrder.containsInAnyOrder(SEND_CAT.toArray(new String[0])));
}
private void sendNotify(String state, Set<String> receiveCats)
{
AbstractIU mockIUNotify = mock(AbstractIU.class);
Payload mockNotifyPayload = mock(Payload.class);
when(mockIUNotify.getCategory()).thenReturn(ComponentNotifier.NOTIFY_CATEGORY);
when(mockIUNotify.getPayload()).thenReturn(mockNotifyPayload);
when(mockInBuffer.getIU("iuNotify")).thenReturn(mockIUNotify);
when(mockNotifyPayload.get(ComponentNotifier.STATE)).thenReturn(state);
when(mockNotifyPayload.get(ComponentNotifier.NAME)).thenReturn("namex");
when(mockNotifyPayload.get(ComponentNotifier.SEND_CATEGORIES)).thenReturn("");
when(mockNotifyPayload.get(ComponentNotifier.RECEIVE_CATEGORIES)).thenReturn(Joiner.on(",").join(receiveCats));
inputHandler.call(mockInBuffer, "iuNotify", false, IUEventType.ADDED, ComponentNotifier.NOTIFY_CATEGORY);
}
@Test
public void testNotifyAtNotifyNew() throws Exception
{
sendNotify("new", ImmutableSet.of("testsnd1"));
ArgumentCaptor<LocalIU> argument = ArgumentCaptor.forClass(LocalIU.class);
verify(mockOutBuffer, times(2)).add(argument.capture());
LocalIU iu = argument.getAllValues().get(1);
assertEquals("componentNotify", iu.getCategory());
assertEquals("old", iu.getPayload().get("state"));
}
@Test
public void testNoNotifyAtNotifyOld() throws Exception
{
sendNotify("old", ImmutableSet.of("testsnd1"));
ArgumentCaptor<LocalIU> argument = ArgumentCaptor.forClass(LocalIU.class);
verify(mockOutBuffer, times(1)).add(argument.capture());
}
private class WaitForFinish extends Thread
{
public volatile boolean waitFinish = false;
public void run()
{
notifier.waitForReceivers();
waitFinish = true;
}
}
@Test
public void testWait() throws InterruptedException
{
WaitForFinish wff = new WaitForFinish();
wff.start();
Thread.sleep(200);
assertFalse(wff.waitFinish);
sendNotify("new",SEND_CAT);
Thread.sleep(200);
assertTrue(wff.waitFinish);
}
@Test
public void testWaitWrongCats() throws InterruptedException
{
WaitForFinish wff = new WaitForFinish();
wff.start();
Thread.sleep(200);
assertFalse(wff.waitFinish);
sendNotify("new",RECV_CAT);
Thread.sleep(200);
assertFalse(wff.waitFinish);
}
@Test
public void testWaitIncremental()throws InterruptedException
{
WaitForFinish wff = new WaitForFinish();
wff.start();
Thread.sleep(200);
assertFalse(wff.waitFinish);
sendNotify("new",ImmutableSet.of("testsnd1", "testsnd2"));
Thread.sleep(200);
assertFalse(wff.waitFinish);
sendNotify("new",ImmutableSet.of("testsnd3"));
Thread.sleep(200);
assertTrue(wff.waitFinish);
}
}
package ipaaca.util.communication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
/**
* Unit tests for the FutureIU
* @author hvanwelbergen
*
*/
public class FutureIUTest
{
private final OutputBuffer outBuffer = new OutputBuffer("component1");
@Test(timeout = 2000)
public void testSendBeforeTake() throws InterruptedException
{
FutureIU fu = new FutureIU("cat1", "status", "started");
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("status", "started");
outBuffer.add(message);
assertEquals(message.getPayload(), fu.take().getPayload());
}
@Test(timeout = 2000)
public void testSendAfterTake() throws InterruptedException
{
FutureIU fu = new FutureIU("cat1", "status", "started");
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("status", "started");
Runnable send = () -> {
try
{
Thread.sleep(1000);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
outBuffer.add(message);
};
new Thread(send).start();
assertEquals(message.getPayload(), fu.take().getPayload());
}
@Test
public void testInvalidKeyValue() throws InterruptedException
{
FutureIU fu = new FutureIU("cat1", "status", "started");
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("status", "cancelled");
assertNull(fu.take(1,TimeUnit.SECONDS));
}
}
package ipaaca.util.communication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Test;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
/**
* Unit tests for FutureIUs
* @author hvanwelbergen
*
*/
public class FutureIUsTest
{
private FutureIUs fus = new FutureIUs("cat1","id");
private final OutputBuffer outBuffer = new OutputBuffer("component1");
@After
public void cleanup()
{
fus.close();
}
@Test(timeout = 2000)
public void testSendBeforeTake() throws InterruptedException
{
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("id", "id1");
outBuffer.add(message);
assertEquals(message.getPayload(), fus.take("id1").getPayload());
}
@Test(timeout = 2000)
public void testSendAfterTake() throws InterruptedException
{
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("id", "id1");
Runnable send = () -> {
try
{
Thread.sleep(1000);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
outBuffer.add(message);
};
new Thread(send).start();
assertEquals(message.getPayload(), fus.take("id1").getPayload());
}
@Test
public void testNonMatchingKeyValue() throws InterruptedException
{
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("id", "id2");
outBuffer.add(message);
assertNull(fus.take("id1", 1,TimeUnit.SECONDS));
}
@Test
public void testMultipleKeyValues() throws InterruptedException
{
LocalMessageIU message1 = new LocalMessageIU("cat1");
message1.getPayload().put("id", "id1");
LocalMessageIU message2 = new LocalMessageIU("cat1");
message2.getPayload().put("id", "id2");
outBuffer.add(message2);
outBuffer.add(message1);
assertEquals(message1.getPayload(), fus.take("id1").getPayload());
assertEquals(message2.getPayload(), fus.take("id2").getPayload());
}
}
// This file is part of IPAACA, the
// "Incremental Processing Architecture
// for Artificial Conversational Agents".
//
// Copyright (c) 2009-2022 Social Cognitive Systems Group
// CITEC, Bielefeld University
//
// http://opensource.cit-ec.de/projects/ipaaca/
// http://purl.org/net/ipaaca
//
// This file may be licensed under the terms of of the
// GNU Lesser General Public License Version 3 (the ``LGPL''),
// or (at your option) any later version.
//
// Software distributed under the License is distributed
// on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
// express or implied. See the LGPL for the specific language
// governing rights and limitations.
//
// You should have received a copy of the LGPL along with this
// program. If not, go to http://www.gnu.org/licenses/lgpl.html
// or write to the Free Software Foundation, Inc.,
// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//
// The development of this software was supported by the
// Excellence Cluster EXC 277 Cognitive Interaction Technology.
// The Excellence Cluster EXC 277 is a grant of the Deutsche
// Forschungsgemeinschaft (DFG) in the context of the German
// Excellence Initiative.
syntax = "proto2";
package ipaaca.protobuf;
enum TransportMessageType {
WireTypeRESERVED = 0;
WireTypeIntMessage = 1;
WireTypeRemoteRequestResult = 2;
WireTypeIU = 3;
WireTypeMessageIU = 4; // special case on the wire (use other converter)
WireTypeIUPayloadUpdate = 5;
WireTypeIULinkUpdate = 6;
WireTypeIURetraction = 7;
WireTypeIUCommission = 8;
WireTypeIUResendRequest = 9;
WireTypeIUPayloadUpdateRequest = 100;
WireTypeIUCommissionRequest = 101;
WireTypeIULinkUpdateRequest = 102;
}
message TransportLevelWrapper {
required TransportMessageType transport_message_type = 1;
required bytes raw_message = 2;
}
message IntMessage {
required sint32 value = 1;
}
message LinkSet {
required string type = 1;
repeated string targets = 2;
}
message PayloadItem {
required string key = 1;
required string value = 2;
required string type = 3 [default = "str"];
}
message IU {
enum AccessMode {
PUSH = 0;
REMOTE = 1;
MESSAGE = 2;
}
required string uid = 1;
required uint32 revision = 2;
required string category = 3 [default = "undef"];
required string payload_type = 4 [default = "MAP"];
required string owner_name = 5;
required bool committed = 6 [default = false];
required AccessMode access_mode = 7 [default = PUSH];
required bool read_only = 8 [default = false];
repeated PayloadItem payload = 9;
repeated LinkSet links = 10;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUPayloadUpdate {
required string uid = 1;
required uint32 revision = 2;
repeated PayloadItem new_items = 3;
repeated string keys_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IURetraction {
required string uid = 1;
required uint32 revision = 2;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUCommission {
required string uid = 1;
required uint32 revision = 2;
required string writer_name = 3;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IULinkUpdate {
required string uid = 1;
required uint32 revision = 2;
repeated LinkSet new_links = 3;
repeated LinkSet links_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUResendRequest {
required string uid = 1;
required string hidden_scope_name = 2;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
// Result for remote operations (below).
// Used to send a raw int, which was problematic.
// Usually: 0 = Failed, >0 = new revision of successfully modified resource.
message RemoteRequestResult {
required uint32 result = 1;
optional string request_uid = 100 [default = ""];
//optional string request_endpoint = 101 [default = ""];
}
// Remote / request versions of buffer setters:
// they just go with a dedicated ID
message IUPayloadUpdateRequest {
required string uid = 1;
required uint32 revision = 2;
repeated PayloadItem new_items = 3;
repeated string keys_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUCommissionRequest {
required string uid = 1;
required uint32 revision = 2;
required string writer_name = 3;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IULinkUpdateRequest {
required string uid = 1;
required uint32 revision = 2;
repeated LinkSet new_links = 3;
repeated LinkSet links_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
File moved
......@@ -4,4 +4,8 @@ resource.path=${shared.resources}/Shared3DModels/resource;${shared.resources}/De
rebuild.list=
pyzip.excludes=
run.py=run.py
#publish.resolver=soa.core.repository
publish.resolver=asap.sftp.publish
dist.dir=../../dist
deps.dir=../../deps
extra.python.path=/vol/soa/opt64/python-spread/current/lib/python2.7/site-packages
<?xml version="1.0" encoding="UTF-8"?>
<project name="IpaacaPython" default="run">
<import file="../../soashared/ant/build.xml" />
<import file="../../../hmibuild/build.xml" />
<!--import file="../../../HmiBuild/build.xml" / -->
<target name="-pre-compilation">
<echo message="Compiling protobuf file" />
<exec executable="protoc">
<arg value="--proto_path=../proto" />
<arg value="../proto/ipaaca.proto" />
<arg value="--python_out=build/" />
<arg value="--python_out=src/ipaaca/" />
</exec>
</target>
</project>
......
<ivy-module version="2.0">
<info organisation="ipaaca" module="IpaacaPython" />
<publications>
<artifact type="py.zip" ext="py.zip"/>
</publications>
<dependencies>
<dependency org="google" name="protobuf" rev="latest.release"/>
<dependency org="rsb" name="rsb" rev="latest.release"/>
<dependency org="google" name="protobuf-python" rev="latest.release"/>
</dependencies>
</ivy-module>
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Fri Sep 9 14:12:05 2016
@author: jpoeppel
"""
from setuptools import setup
import os
import sys
import subprocess
from os import path as op
from distutils.spawn import find_executable
from setuptools.command.build_py import build_py
from setuptools.command.bdist_egg import bdist_egg
from distutils.command.build import build
from distutils.command.sdist import sdist
class ProtoBuild(build_py):
"""
This command automatically compiles all .proto files with `protoc` compiler
and places generated files near them -- i.e. in the same directory.
"""
def find_protoc(self):
"Locates protoc executable"
if 'PROTOC' in os.environ and os.path.exists(os.environ['PROTOC']):
protoc = os.environ['PROTOC']
else:
protoc = find_executable('protoc')
if protoc is None:
sys.stderr.write('protoc not found. Is protobuf-compiler installed? \n'
'Alternatively, you can point the PROTOC environment variable at a local version.')
sys.exit(1)
return protoc
def run(self):
#TODO determine path automaticall
packagedir = "../proto"
print("running build proto")
for protofile in filter(lambda x: x.endswith('.proto'), os.listdir(packagedir)):
source = op.join(packagedir, protofile)
output = source.replace('.proto', '_pb2.py')
if (not op.exists(output) or (op.getmtime(source) > op.getmtime(output))):
sys.stderr.write('Protobuf-compiling ' + source + '\n')
subprocess.check_call([self.find_protoc(), "-I={}".format(packagedir),'--python_out=./src/ipaaca', source])
class BDist_egg(bdist_egg):
'''
Simple wrapper around the normal bdist_egg command to require
protobuf build before normal build.
.. codeauthor:: jwienke
'''
def run(self):
self.run_command('build_proto')
bdist_egg.run(self)
class Build(build):
'''
Simple wrapper around the normal build command to require protobuf build
before normal build.
.. codeauthor:: jwienke
'''
def run(self):
self.run_command('build_proto')
build.run(self)
class Sdist(sdist):
'''
Simple wrapper around the normal sdist command to require protobuf build
before generating the source distribution..
.. codeauthor:: jwienke
'''
def run(self):
# fetch the protocol before building the source distribution so that
# we have a cached version and each user can rebuild the protocol
# with his own protobuf version
self.run_command('build_proto')
sdist.run(self)
version = "0.1.3" #TODO determine correct version! ideally from git, maybe do something similar to rsb/setup.py
setup(name="ipaaca",
version=version,
author="Hendrik Buschmeier, Ramin Yaghoubzadeh, Sören Klett",
author_email="hbuschme@uni-bielefeld.de,ryaghoubzadeh@uni-bielefeld.de,sklett@techfak.uni-bielefeld.de",
license='LGPLv3+',
url='https://opensource.cit-ec.de/projects/ipaaca',
install_requires=["paho-mqtt", "six", "protobuf"],
packages=["ipaaca", "ipaaca.util"],
package_dir={"ipaaca":"src/ipaaca"},
# TODO Do we want to add ipaaca_pb2.py to the egg or as separate package?
# data_files=[("./ipaaca", ["ipaaca_pb2.py"])],
# dependency_links=[
# 'http://www.spread.org/files/'
# 'SpreadModule-1.5spread4.tgz#egg=SpreadModule-1.5spread4'],
cmdclass ={
"build_proto": ProtoBuild,
"sdist": Sdist,
"build": Build,
"bdist_egg":BDist_egg
}
)
#!/usr/bin/env python
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
import time
import ipaaca
def remote_change_dumper(iu, event_type, local):
if local:
print('remote side '+event_type+': '+str(iu))
ob = ipaaca.OutputBuffer('CoolInformerOut')
ob.register_handler(remote_change_dumper)
iu_top = ipaaca.IU()
iu_top.payload = {'data': 'raw'}
ob.add(iu_top)
iu = ipaaca.IU()
iu.payload = {'a':'a1'}
ob.add(iu)
iu.payload = {'a':'a2', 'b':'b1'} #OK
del(iu.payload['b'])
iu.payload['c'] = 'c1'
iu.payload['a'] = 'a3'
iu.add_links('sameold', iu_top.uid)
time.sleep(1)
iu.commit()
while True:
time.sleep(1)
*_pb2.py
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import os
import threading
#import rsb
#import rsb.converter
import ipaaca.ipaaca_pb2
import ipaaca.converter
from ipaaca.buffer import InputBuffer, OutputBuffer
from ipaaca.exception import *
from ipaaca.iu import IU, Message, IUAccessMode, IUEventType
from ipaaca.misc import enable_logging, IpaacaArgumentParser
from ipaaca.payload import Payload
import ipaaca.backend
#
# ipaaca.exit(int_retval)
#
from ipaaca.buffer import atexit_cleanup_function
def exit(int_retval=0):
'''For the time being, this function can be used to
circumvent any sys.exit blocks, while at the same time
cleaning up the buffers (e.g. retracting IUs).
Call once at the end of any python script (or anywhere
in lieu of sys.exit() / os._exit(). '''
print('ipaaca: cleaning up and exiting with code '+str(int_retval))
atexit_cleanup_function()
os._exit(int_retval)
__RSB_INITIALIZER_LOCK = threading.Lock()
__RSB_INITIALIZED = False
def initialize_ipaaca_rsb_if_needed():
"""Initialise rsb if not yet initialise.
* Register own RSB converters.
* Initialise RSB from enviroment variables, rsb config file, or
from default values for RSB trnasport, host, and port (via
ipaaca.defaults or ipaaca.misc.IpaacaArgumentParser).
"""
global __RSB_INITIALIZED
with __RSB_INITIALIZER_LOCK:
if __RSB_INITIALIZED:
return
else:
ipaaca.converter.register_global_converter(
ipaaca.converter.IUConverter(
wireSchema="ipaaca-iu",
dataType=IU))
ipaaca.converter.register_global_converter(
ipaaca.converter.MessageConverter(
wireSchema="ipaaca-messageiu",
dataType=Message))
ipaaca.converter.register_global_converter(
ipaaca.converter.IULinkUpdateConverter(
wireSchema="ipaaca-iu-link-update",
dataType=converter.IULinkUpdate))
ipaaca.converter.register_global_converter(
ipaaca.converter.IUPayloadUpdateConverter(
wireSchema="ipaaca-iu-payload-update",
dataType=converter.IUPayloadUpdate))
if ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT is not None:
if ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT == 'spread':
os.environ['RSB_TRANSPORT_SPREAD_ENABLED'] = str(1)
os.environ['RSB_TRANSPORT_SOCKET_ENABLED'] = str(0)
elif ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT == 'socket':
os.environ['RSB_TRANSPORT_SPREAD_ENABLED'] = str(0)
os.environ['RSB_TRANSPORT_SOCKET_ENABLED'] = str(1)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_SOCKET_SERVER is not None:
os.environ['RSB_TRANSPORT_SOCKET_SERVER'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_SOCKET_SERVER)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST is not None:
os.environ['RSB_TRANSPORT_SPREAD_HOST'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST)
os.environ['RSB_TRANSPORT_SOCKET_HOST'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT is not None:
os.environ['RSB_TRANSPORT_SPREAD_PORT'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT)
os.environ['RSB_TRANSPORT_SOCKET_PORT'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT)
#
ipaaca.backend.register_backends()
__RSB_INITIALIZED = True
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import ipaaca.converter
import threading
import uuid
import os
import time
LOGGER = ipaaca.misc.get_library_logger()
__registered_backends = {}
__backend_registration_done = False
def register_backends():
global __registered_backends
global __backend_registration_done
if not __backend_registration_done:
__backend_registration_done = True
LOGGER.debug('Registering available back-ends')
# register available backends
# mqtt
import ipaaca.backend_mqtt
be = ipaaca.backend_mqtt.create_backend()
if be is not None:
__registered_backends[be.name] = be
LOGGER.debug('Back-end '+str(be.name)+' added')
# ros
import ipaaca.backend_ros
be = ipaaca.backend_ros.create_backend()
if be is not None:
__registered_backends[be.name] = be
LOGGER.debug('Back-end '+str(be.name)+' added')
def get_default_backend():
# TODO selection mechanism / config
if not __backend_registration_done:
register_backends()
if len(__registered_backends) == 0:
raise RuntimeError('No back-ends could be initialized for ipaaca-python')
cfg = ipaaca.config.get_global_config()
preferred = cfg.get_with_default('backend', None)
if preferred is None:
k = list(__registered_backends.keys())[0]
if len(__registered_backends) > 1:
LOGGER.warning('No preferred ipaaca.backend set, returning one of several (probably the first in list)')
print('Using randomly selected back-end {}!'.format(k))
else:
if preferred in __registered_backends:
k = preferred
else:
raise ipaaca.exception.BackendInitializationError(preferred)
LOGGER.info('Back-end is '+str(k))
return __registered_backends[k]
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import collections
import ipaaca.ipaaca_pb2
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import ipaaca.converter
import ipaaca.backend
import ipaaca.config
import threading
try:
import queue
except:
import Queue as queue
import uuid
import os
import time
try:
import paho.mqtt.client as mqtt
MQTT_ENABLED = True
except:
MQTT_ENABLED = False
if not MQTT_ENABLED:
def create_backend():
return None
else:
def create_backend():
return MQTTBackend(name='mqtt')
LOGGER = ipaaca.misc.get_library_logger()
_REMOTE_SERVER_MAX_QUEUED_REQUESTS = -1 # unlimited
_REMOTE_LISTENER_MAX_QUEUED_EVENTS = 1024 # 'Full' exception if exceeded
class EventWrapper(object):
def __init__(self, data):
self.data = data
class PendingRequest(object):
'''Encapsulation of a pending remote request with
a facility to keep the requesting thread locked
until the reply or a timeout unlocks it.'''
def __init__(self, request):
self._request = request
self._event = threading.Event()
self._reply = None
self._request_uid = str(uuid.uuid4())[0:8]
def wait_for_reply(self, timeout=30.0):
wr = self._event.wait(timeout)
return None if wr is False else self._reply
def reply_with_result(self, reply):
self._reply = reply
self._event.set()
class Informer(object):
'''Informer interface, wrapping an outbound port to MQTT'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
self._mqtt_client.connect(self._host, self._port)
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._live = True
self._live_event.set()
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
pass
def publishData(self, data):
#print('Informer publishing '+str(data.__class__.__name__)+' on '+self._scope)
self._mqtt_client.publish(self._scope, ipaaca.converter.serialize(data), qos=2)
class BackgroundEventDispatcher(threading.Thread):
def __init__(self, listener):
super(BackgroundEventDispatcher, self).__init__()
self.daemon = True
self._listener = listener
def terminate(self):
self._running = False
def run(self):
self._running = True
listener = self._listener
while self._running: # auto-terminated (daemon)
event = listener._event_queue.get(block=True, timeout=None)
if event is None: return # signaled termination
#print('\033[31mDispatch '+str(event.data.__class__.__name__)+' start ...\033[m')
for handler in self._listener._handlers:
handler(event)
#print('\033[32m... dispatch '+str(event.data.__class__.__name__)+' end.\033[m')
class Listener(object):
'''Listener interface, wrapping an inbound port from MQTT'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
self._event_queue = queue.Queue(_REMOTE_LISTENER_MAX_QUEUED_EVENTS)
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_socket_open = self.mqtt_callback_on_socket_open
#self._mqtt_client.on_socket_close = self.mqtt_callback_on_socket_close
#self._mqtt_client.on_log = self.mqtt_callback_on_log
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self._dispatcher = BackgroundEventDispatcher(self)
self._dispatcher.start()
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._event_queue.put(None, block=False) # signal termination, waking queue
self._dispatcher.terminate()
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
LOGGER.debug('Connect to '+str(self._host)+':'+str(self._port))
self._mqtt_client.connect(self._host, self._port)
#def mqtt_callback_on_log(self, client, userdata, level, buf):
# print('Listener: LOG: '+str(buf))
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._mqtt_client.subscribe(self._scope, qos=2)
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
self._live = True
self._live_event.set()
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
event = EventWrapper(ipaaca.converter.deserialize(message.payload))
self._event_queue.put(event, block=False) # queue event for BackgroundEventDispatcher
def addHandler(self, handler):
self._handlers.append(handler)
#def publishData(self, data):
# self._mqtt_client.publish(self._
class LocalServer(object):
'''LocalServer interface, allowing for RPC requests to
IU functions, or reporting back success or failure.'''
def __init__(self, buffer_impl, scope, config=None):
self._buffer = buffer_impl
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_LocalServer_' + self._uuid # unused atm
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
self._mqtt_client.connect(self._host, self._port)
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._mqtt_client.subscribe(self._scope, qos=2)
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
self._live = True
self._live_event.set()
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
req = ipaaca.converter.deserialize(message.payload)
result = None
if isinstance(req, ipaaca.converter.IUPayloadUpdate):
result = self.attempt_to_apply_remote_updatePayload(req)
elif isinstance(req, ipaaca.converter.IULinkUpdate):
result = self.attempt_to_apply_remote_updateLinks(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUCommission):
result = self.attempt_to_apply_remote_commit(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUResendRequest):
result = self.attempt_to_apply_remote_resendRequest(req)
else:
raise RuntimeError('LocalServer: got an object of wrong class '+str(req.__class__.__name__)) # TODO replace
if result is not None:
self.send_result_for_request(req, result)
#
def send_result_for_request(self, obj, result):
pbo = ipaaca.ipaaca_pb2.RemoteRequestResult()
pbo.result = result
pbo.request_uid = obj.request_uid
#print('Sending result to endpoint '+str(obj.request_endpoint))
self._mqtt_client.publish(obj.request_endpoint, ipaaca.converter.serialize(pbo), qos=2)
def attempt_to_apply_remote_updateLinks(self, obj):
return self._buffer._remote_update_links(obj)
def attempt_to_apply_remote_updatePayload(self, obj):
return self._buffer._remote_update_payload(obj)
def attempt_to_apply_remote_commit(self, obj):
return self._buffer._remote_commit(obj)
def attempt_to_apply_remote_resendRequest(self, obj):
return self._buffer._remote_request_resend(obj)
class RemoteServer(object):
'''RemoteServer, connects to a LocalServer on the side
of an actual IU owner, which will process any requests.
The RemoteServer is put on hold while the owner is
processing. RemoteServer is from RSB terminology,
it might more aptly be described as an RPC client.'''
def __init__(self, remote_end_scope, config=None):
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
#queue.Queue(_REMOTE_SERVER_MAX_QUEUED_REQUESTS)
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_RemoteServer_' + self._uuid
# will RECV here:
self._scope = '/ipaaca/remotes/' + self._name
# will SEND here
self._remote_end_scope = remote_end_scope
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + remote_end_scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
self._mqtt_client.connect(self._host, self._port)
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._mqtt_client.subscribe(self._scope, qos=2)
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
self._live = True
self._live_event.set()
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
reply = ipaaca.converter.deserialize(message.payload)
if isinstance(reply, ipaaca.ipaaca_pb2.RemoteRequestResult):
uid = reply.request_uid
pending_request = None
with self._pending_requests_lock:
if uid in self._pending_requests:
pending_request = self._pending_requests[uid]
del self._pending_requests[uid]
if pending_request is None:
raise RuntimeError('RemoteServer: got a reply for request uid that is not queued: '+str(uid))
else:
# provide result to other thread and unblock it
pending_request.reply_with_result(reply)
else:
raise RuntimeError('RemoteServer: got an object of wrong class '+str(reply.__class__.__name__)) # TODO replace
def queue_pending_request(self, request):
pending_request = PendingRequest(request)
with self._pending_requests_lock:
if _REMOTE_SERVER_MAX_QUEUED_REQUESTS>0 and len(self._pending_requests) >= _REMOTE_SERVER_MAX_QUEUED_REQUESTS:
raise RuntimeError('RemoteServer: maximum number of pending requests exceeded') # TODO replace?
else:
self._pending_requests[pending_request._request_uid] = pending_request
return pending_request
# impl
def blocking_call(self, request):
# Broker's queue will raise before sending anything if capacity is exceeded
pending_request = self.queue_pending_request(request)
# complete and send request
request.request_uid = pending_request._request_uid
request.request_endpoint = self._scope
self._mqtt_client.publish(self._remote_end_scope, ipaaca.converter.serialize(request), qos=2)
# wait for other end to return result
reply = pending_request.wait_for_reply()
if reply is None:
LOGGER.warning('A request timed out!')
return 0
else:
return reply.result # the actual int result
# glue that quacks like the RSB version
def resendRequest(self, req):
return self.blocking_call(req)
def commit(self, req):
return self.blocking_call(req)
def updatePayload(self, req):
return self.blocking_call(req)
def updateLinks(self, req):
return self.blocking_call(req)
class MQTTBackend(object):
def __init__(self, name='mqtt'):
# back-end initialization code
self._config = ipaaca.config.get_global_config()
self._name = name
self._participants = set()
def _get_name(self):
return self._name
name = property(_get_name)
def teardown(self):
LOGGER.info('MQTT teardown: waiting 1 sec for final deliveries')
time.sleep(1)
for p in self._participants:
p.deactivate_internal()
def Scope(self, scope_str):
'''Scope adapter (glue replacing rsb.Scope)'''
return str(scope_str)
def createLocalServer(self, buffer_impl, scope, config=None):
LOGGER.debug('Creating a LocalServer on '+str(scope))
s = LocalServer(buffer_impl, scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
def createRemoteServer(self, scope, config=None):
LOGGER.debug('Creating a RemoteServer on '+str(scope))
s = RemoteServer(scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
def createInformer(self, scope, config=None, dataType="ignored in this backend"):
LOGGER.debug('Creating an Informer on '+str(scope))
s = Informer(scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
def createListener(self, scope, config=None):
LOGGER.debug('Creating a Listener on '+str(scope))
s = Listener(scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import collections
import sys
import ipaaca.ipaaca_pb2
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import ipaaca.converter
import ipaaca.backend
import ipaaca.config as config
LOGGER = ipaaca.misc.get_library_logger()
ROS_ENABLED, __try_guessing = False, False
try:
import rospy
from std_msgs.msg import String
import base64
ROS_ENABLED = True
except:
LOGGER.debug('rospy or deps not found, ROS backend disabled')
ROS_ENABLED = False
if not ROS_ENABLED:
def create_backend():
return None
else:
def create_backend():
return ROSBackend(name='ros')
import threading
try:
import queue
except:
import Queue as queue
import uuid
import os
import time
import sys
class EventWrapper(object):
def __init__(self, data):
self.data = data
class PendingRequest(object):
'''Encapsulation of a pending remote request with
a facility to keep the requesting thread locked
until the reply or a timeout unlocks it.'''
def __init__(self, request):
self._request = request
self._event = threading.Event()
self._reply = None
self._request_uid = str(uuid.uuid4())[0:8]
def wait_for_reply(self, timeout=30.0):
wr = self._event.wait(timeout)
return None if wr is False else self._reply
def reply_with_result(self, reply):
self._reply = reply
self._event.set()
class Informer(object):
'''Informer interface, wrapping an outbound port to ROS'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._ros_pub = rospy.Publisher(self._scope, String, queue_size=100, tcp_nodelay=True, latch=True)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = config.get_with_default('transport.mqtt.port', 1883, warn=True)
def deactivate(self):
pass
#self._ros_pub.unregister()
#self._ros_pub = None
def publishData(self, data):
self._ros_pub.publish(ROSBackend.serialize(data))
class BackgroundEventDispatcher(threading.Thread):
def __init__(self, event, handlers):
super(BackgroundEventDispatcher, self).__init__()
self.daemon = True
self._event = event
self._handlers = handlers
def run(self):
for handler in self._handlers:
handler(self._event)
class Listener(object):
'''Listener interface, wrapping an inbound port from ROS'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._ros_sub = rospy.Subscriber(self._scope, String, self.on_message, tcp_nodelay=True)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = config.get_with_default('transport.mqtt.port', 1883, warn=True)
def deactivate(self):
pass
#self._ros_sub.unregister()
#self._ros_sub = None
def on_message(self, message):
event = EventWrapper(ROSBackend.deserialize(message.data))
## (1) with extra thread:
#dispatcher = BackgroundEventDispatcher(event, self._handlers)
#dispatcher.start()
## or (2) with no extra thread:
for handler in self._handlers:
handler(event)
def addHandler(self, handler):
self._handlers.append(handler)
class LocalServer(object):
'''LocalServer interface, allowing for RPC requests to
IU functions, or reporting back success or failure.'''
def __init__(self, buffer_impl, scope, config=None):
self._buffer = buffer_impl
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_LocalServer_' + self._uuid # unused atm
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._ros_pubs = {}
self._ros_sub = rospy.Subscriber(self._scope, String, self.on_message, tcp_nodelay=True)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = config.get_with_default('transport.mqtt.port', 1883, warn=True)
def get_publisher(self, endpoint):
if endpoint in self._ros_pubs:
return self._ros_pubs[endpoint]
else:
p = rospy.Publisher(endpoint, String, queue_size=10, tcp_nodelay=True, latch=True)
self._ros_pubs[endpoint] = p
return p
def deactivate(self):
pass
#self._ros_sub.unregister()
#for v in self._ros_pubs.values():
# v.unregister()
#self._ros_sub = None
#self._ros_pubs = {}
def on_message(self, message):
req = ROSBackend.deserialize(message.data)
result = None
if isinstance(req, ipaaca.converter.IUPayloadUpdate):
result = self.attempt_to_apply_remote_updatePayload(req)
elif isinstance(req, ipaaca.converter.IULinkUpdate):
result = self.attempt_to_apply_remote_updateLinks(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUCommission):
result = self.attempt_to_apply_remote_commit(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUResendRequest):
result = self.attempt_to_apply_remote_resendRequest(req)
else:
raise RuntimeError('LocalServer: got an object of wrong class '+str(req.__class__.__name__)) # TODO replace
if result is not None:
self.send_result_for_request(req, result)
#
def send_result_for_request(self, obj, result):
pbo = ipaaca.ipaaca_pb2.RemoteRequestResult()
pbo.result = result
pbo.request_uid = obj.request_uid
#print('Sending result to endpoint '+str(obj.request_endpoint))
pub = self.get_publisher(obj.request_endpoint)
pub.publish(ROSBackend.serialize(pbo))
def attempt_to_apply_remote_updateLinks(self, obj):
return self._buffer._remote_update_links(obj)
def attempt_to_apply_remote_updatePayload(self, obj):
return self._buffer._remote_update_payload(obj)
def attempt_to_apply_remote_commit(self, obj):
return self._buffer._remote_commit(obj)
def attempt_to_apply_remote_resendRequest(self, obj):
return self._buffer._remote_request_resend(obj)
_REMOTE_SERVER_MAX_QUEUED_REQUESTS = -1 # unlimited
class RemoteServer(object):
'''RemoteServer, connects to a LocalServer on the side
of an actual IU owner, which will process any requests.
The RemoteServer is put on hold while the owner is
processing. RemoteServer is from RSB terminology,
it might more aptly be described as an RPC client.'''
def __init__(self, remote_end_scope, config=None):
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
#queue.Queue(_REMOTE_SERVER_MAX_QUEUED_REQUESTS)
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_RemoteServer_' + self._uuid
# will RECV here:
self._scope = '/ipaaca/remotes/' + self._name
# will SEND here
self._remote_end_scope = remote_end_scope
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + remote_end_scope
self._ros_pub = rospy.Publisher(self._remote_end_scope, String, queue_size=10, tcp_nodelay=True, latch=True)
self._ros_sub = rospy.Subscriber(self._scope, String, self.on_message, tcp_nodelay=True)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = config.get_with_default('transport.mqtt.port', 1883, warn=True)
def deactivate(self):
pass
#self._ros_sub.unregister()
#self._ros_pub.unregister()
#self._ros_sub = None
#self._ros_pub = None
def on_message(self, message):
reply = ROSBackend.deserialize(message.data)
if isinstance(reply, ipaaca.ipaaca_pb2.RemoteRequestResult):
uid = reply.request_uid
pending_request = None
with self._pending_requests_lock:
if uid in self._pending_requests:
pending_request = self._pending_requests[uid]
del self._pending_requests[uid]
if pending_request is None:
raise RuntimeError('RemoteServer: got a reply for request uid that is not queued: '+str(uid))
else:
# provide result to other thread and unblock it
pending_request.reply_with_result(reply)
else:
raise RuntimeError('RemoteServer: got an object of wrong class '+str(reply.__class__.__name__)) # TODO replace
def queue_pending_request(self, request):
pending_request = PendingRequest(request)
with self._pending_requests_lock:
if _REMOTE_SERVER_MAX_QUEUED_REQUESTS>0 and len(self._pending_requests) >= _REMOTE_SERVER_MAX_QUEUED_REQUESTS:
raise RuntimeError('RemoteServer: maximum number of pending requests exceeded') # TODO replace?
else:
self._pending_requests[pending_request._request_uid] = pending_request
return pending_request
# impl
def blocking_call(self, request):
# Broker's queue will raise before sending anything if capacity is exceeded
pending_request = self.queue_pending_request(request)
# complete and send request
request.request_uid = pending_request._request_uid
request.request_endpoint = self._scope
self._ros_pub.publish(ROSBackend.serialize(request))
# wait for other end to return result
reply = pending_request.wait_for_reply()
if reply is None:
LOGGER.warning('A request timed out!')
return 0
else:
return reply.result # the actual int result
# glue that quacks like the RSB version
def resendRequest(self, req):
return self.blocking_call(req)
def commit(self, req):
return self.blocking_call(req)
def updatePayload(self, req):
return self.blocking_call(req)
def updateLinks(self, req):
return self.blocking_call(req)
class ROSBackend(object):
def __init__(self, name='ros'):
#import logging
# back-end initialization code
self._name = name
self._need_init = True
#logging.basicConfig(level=logging.DEBUG)
def init_once(self):
'''Actual back-end initialization is only done when it is used'''
if self._need_init:
self._need_init = False
self._config = config.get_global_config()
try:
# generate a ROS node prefix from the basename of argv[0]
clean_name = ''.join([c for c in sys.argv[0].rsplit('/',1)[-1].replace('.', '_').replace('-','_') if c.lower() in 'abcdefghijklmnoprqstuvwxzy0123456789_'])
except:
clean_name = ''
rospy.init_node('ipaaca_python' if len(clean_name)==0 else clean_name,
anonymous=True, disable_signals=True)
def _get_name(self):
return self._name
name = property(_get_name)
def teardown(self):
LOGGER.info('ROS teardown: waiting 1 sec for final deliveries')
time.sleep(1)
rospy.signal_shutdown('Done')
@staticmethod
def serialize(obj):
#print('object class: '+obj.__class__.__name__)
bb = ipaaca.converter.serialize(obj)
st = str(base64.b64encode(bb))
#print('serialized: '+str(st))
return st
@staticmethod
def deserialize(msg):
#print('got serialized: '+str(msg))
bb = base64.b64decode(msg)
return ipaaca.converter.deserialize(bb)
def Scope(self, scope_str):
'''Scope adapter (glue replacing rsb.Scope)'''
# ROS graph resources must not start with a slash
return str(scope_str)[1:] if scope_str.startswith('/') else str(scope_str)
def createLocalServer(self, buffer_impl, scope, config=None):
self.init_once()
LOGGER.debug('Creating a LocalServer on '+str(scope))
LOGGER.debug(' from thread '+threading.current_thread().name)
s = LocalServer(buffer_impl, scope, self._config if config is None else config)
#s._live_event.wait(30.0)
return s
def createRemoteServer(self, scope, config=None):
self.init_once()
LOGGER.debug('Creating a RemoteServer on '+str(scope))
LOGGER.debug(' from thread '+threading.current_thread().name)
s = RemoteServer(scope, self._config if config is None else config)
#s._live_event.wait(30.0)
return s
def createInformer(self, scope, config=None, dataType="ignored in this backend"):
self.init_once()
LOGGER.debug('Creating an Informer on '+str(scope))
LOGGER.debug(' from thread '+threading.current_thread().name)
s = Informer(scope, self._config if config is None else config)
#s._live_event.wait(30.0)
return s
def createListener(self, scope, config=None):
self.init_once()
LOGGER.debug('Creating a Listener on '+str(scope))
LOGGER.debug(' from thread '+threading.current_thread().name)
s = Listener(scope, self._config if config is None else config)
#s._live_event.wait(30.0)
return s
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import threading
import uuid
import traceback
import six
import weakref
import atexit
#import rsb
import ipaaca.ipaaca_pb2
import ipaaca.defaults
import ipaaca.exception
import ipaaca.converter
import ipaaca.iu
import ipaaca.backend
__all__ = [
'InputBuffer',
'OutputBuffer',
]
LOGGER = ipaaca.misc.get_library_logger()
# set of objects to auto-clean on exit, assumes _teardown() method
TEARDOWN_OBJECTS = set()
def atexit_cleanup_function():
'''Function to call at program exit to auto-clean objects.'''
global TEARDOWN_OBJECTS
for obj_r in TEARDOWN_OBJECTS:
obj = obj_r()
if obj is not None: # if weakref still valid
obj._teardown()
ipaaca.backend.get_default_backend().teardown()
atexit.register(atexit_cleanup_function)
def auto_teardown_instances(fn):
'''Decorator function for object constructors, to add
new instances to the object set to auto-clean at exit.'''
def auto_teardown_instances_wrapper(instance, *args, **kwargs):
global TEARDOWN_OBJECTS
fn(instance, *args, **kwargs)
TEARDOWN_OBJECTS.add(weakref.ref(instance))
return auto_teardown_instances_wrapper
class IUStore(dict):
"""A dictionary storing IUs."""
def __init__(self):
super(IUStore, self).__init__()
class FrozenIUStore(IUStore):
"""A read-only version of a dictionary storing IUs. (TODO: might be slow)"""
def __init__(self, original_iu_store):
super(FrozenIUStore, self).__init__()
map(lambda p: super(FrozenIUStore, self).__setitem__(p[0], p[1]), original_iu_store.items())
def __delitem__(self, k):
raise AttributeError()
def __setitem__(self, k, v):
raise AttributeError()
class IUEventHandler(object):
"""Wrapper for IU event handling functions."""
def __init__(self, handler_function, for_event_types=None, for_categories=None):
"""Create an IUEventHandler.
Keyword arguments:
handler_function -- the handler function with the signature
(IU, event_type, local)
for_event_types -- a list of event types or None if handler should
be called for all event types
for_categories -- a list of category names or None if handler should
be called for all categoires
"""
super(IUEventHandler, self).__init__()
self._handler_function = handler_function
self._for_event_types = (
None if for_event_types is None else
(for_event_types[:] if not isinstance(for_event_types, six.string_types) and hasattr(for_event_types, '__iter__') else [for_event_types]))
self._for_categories = (
None if for_categories is None else
(for_categories[:] if not isinstance(for_categories, six.string_types) and hasattr(for_categories, '__iter__') else [for_categories]))
def condition_met(self, event_type, category):
"""Check whether this IUEventHandler should be called.
Keyword arguments:
event_type -- type of the IU event
category -- category of the IU which triggered the event
"""
type_condition_met = (self._for_event_types is None or event_type in self._for_event_types)
cat_condition_met = (self._for_categories is None or category in self._for_categories)
return type_condition_met and cat_condition_met
def call(self, buffer, iu_uid, local, event_type, category):
"""Call this IUEventHandler's function, if it applies.
Keyword arguments:
buffer -- the buffer in which the IU is stored
iu_uid -- the uid of the IU
local -- is the IU local or remote to this component? @RAMIN: Is this correct?
event_type -- IU event type
category -- category of the IU
"""
if self.condition_met(event_type, category):
iu = buffer._iu_store[iu_uid]
self._handler_function(iu, event_type, local)
class Buffer(object):
"""Base class for InputBuffer and OutputBuffer."""
def __init__(self, owning_component_name, channel=None, participant_config=None):
'''Create a Buffer.
Keyword arguments:
owning_compontent_name -- name of the entity that owns this Buffer
participant_config -- RSB configuration
'''
super(Buffer, self).__init__()
ipaaca.initialize_ipaaca_rsb_if_needed()
self._owning_component_name = owning_component_name
self._channel = channel if channel is not None else ipaaca.defaults.IPAACA_DEFAULT_CHANNEL
self._participant_config = participant_config
self._uuid = str(uuid.uuid4())[0:8]
# Initialise with a temporary, but already unique, name
self._unique_name = "undef-"+self._uuid
self._iu_store = IUStore()
self._iu_event_handlers = []
def _get_frozen_iu_store(self):
return FrozenIUStore(original_iu_store = self._iu_store)
iu_store = property(fget=_get_frozen_iu_store, doc='Copy-on-read version of the internal IU store')
def _get_channel(self):
return self._channel
channel = property(
fget=_get_channel,
doc='The IPAACA channel the buffer is connected to.')
def register_handler(self, handler_function, for_event_types=None, for_categories=None):
"""Register a new IU event handler function.
Keyword arguments:
handler_function -- a function with the signature (IU, event_type, local)
for_event_types -- a list of event types or None if handler should
be called for all event types
for_categories -- a list of category names or None if handler should
be called for all categories
"""
if handler_function in [h._handler_function for h in self._iu_event_handlers]:
LOGGER.warn("The handler function '" + handler_function.__name__ + '" has been registered before.')
handler = IUEventHandler(handler_function=handler_function, for_event_types=for_event_types, for_categories=for_categories)
self._iu_event_handlers.append(handler)
return handler
def call_iu_event_handlers(self, uid, local, event_type, category):
"""Call registered IU event handler functions registered for this event_type and category."""
for h in self._iu_event_handlers:
try:
h.call(self, uid, local=local, event_type=event_type, category=category)
except Exception as e:
if local:
LOGGER.error('Local IU handler raised an exception upon remote write.' + str(e))
else:
print(str(traceback.format_exc()))
raise e
def _get_owning_component_name(self):
"""Return the name of this Buffer's owning component"""
return self._owning_component_name
owning_component_name = property(_get_owning_component_name)
def _get_unique_name(self):
"""Return the Buffer's unique name."""
return self._unique_name
unique_name = property(_get_unique_name)
class InputBuffer(Buffer):
"""An InputBuffer that holds remote IUs."""
@auto_teardown_instances
def __init__(self, owning_component_name, category_interests=None, channel=None, participant_config=None, resend_active=False):
'''Create an InputBuffer.
Keyword arguments:
owning_compontent_name -- name of the entity that owns this InputBuffer
category_interests -- list of IU categories this Buffer is interested in
participant_config = RSB configuration
'''
super(InputBuffer, self).__init__(owning_component_name, channel, participant_config)
self._unique_name = '/ipaaca/component/'+str(owning_component_name)+'ID'+self._uuid+'/IB'
self._resend_active = resend_active
self._listener_store = {} # one per IU category
self._remote_server_store = {} # one per remote-IU-owning Component
self._category_interests = []
# add own uuid as identifier for hidden category.
self._add_category_listener(str(self._uuid))
if category_interests is not None:
self.add_category_interests(category_interests)
def _get_remote_server(self, event_or_iu):
'''Return (or create, store and return) a remote server.'''
_remote_server_name = self._get_owner(event_or_iu) + '/Server'
if _remote_server_name:
try:
return self._remote_server_store[_remote_server_name]
except KeyError:
be = ipaaca.backend.get_default_backend()
remote_server = be.createRemoteServer(be.Scope(str(_remote_server_name)), config=self._participant_config)
self._remote_server_store[_remote_server_name] = remote_server
return remote_server
else:
None
def _get_owner(self, event_or_iu):
if hasattr(event_or_iu, 'data'):
# is RSB event
data = event_or_iu.data
if hasattr(data, 'owner_name'):
return data.owner_name
elif hasattr(data, 'writer_name'):
return data.writer_name
else:
return None
else:
# is IU
return event_or_iu.owner_name
def _add_category_listener(self, iu_category):
'''Create and store a listener on a specific category.'''
if iu_category not in self._listener_store:
be = ipaaca.backend.get_default_backend()
cat_listener = be.createListener(be.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)), config=self._participant_config)
cat_listener.addHandler(self._handle_iu_events)
self._listener_store[iu_category] = cat_listener
self._category_interests.append(iu_category)
LOGGER.info("Added listener in scope /ipaaca/channel/" + str(self._channel) + "/category/" + iu_category)
def _remove_category_listener(self, iu_category):
'''Remove the listener for a specific category.'''
if iu_category in self._listener_store and iu_category in self._category_interests:
del self._listener_store[iu_category]
self._category_interests.remove(iu_category)
LOGGER.info("Removed listener in scope /ipaaca/channel/" + str(self._channel) + "/category/ " + iu_category)
def _teardown(self):
'''OutputBuffer retracts remaining live IUs on teardown'''
self._deactivate_all_internal()
def __del__(self):
'''Perform teardown as soon as Buffer is lost.'''
self._deactivate_all_internal()
def _deactivate_all_internal(self):
'''Deactivate all participants.'''
for listener in self._listener_store.values():
try:
listener.deactivate()
except RuntimeError:
# Is raised if an already deactivated participant is
# deactivated again
pass
def _handle_iu_events(self, event):
'''Dispatch incoming IU events.
Adds incoming IU's to the store, applies payload and commit updates to
IU, calls IU event handlers.'
Keyword arguments:
event -- a converted RSB event
'''
type_ = type(event.data)
if type_ == ipaaca.iu.RemotePushIU:
# a new IU
if event.data.uid not in self._iu_store:
self._iu_store[event.data.uid] = event.data
event.data.buffer = self
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.ADDED, category=event.data.category)
else:
# IU already in our store, overwrite local IU, but do not call
# event handler. This functionality is necessary to undo
# destructive changes after a failing remote updates (undo is
# done via the resend request mechanism).
self._iu_store[event.data.uid] = event.data
event.data.buffer = self
elif type_ == ipaaca.iu.RemoteMessage:
# a new Message, an ephemeral IU that is removed after calling handlers
self._iu_store[ event.data.uid ] = event.data
event.data.buffer = self
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.MESSAGE, category=event.data.category)
del self._iu_store[ event.data.uid ]
else:
if event.data.uid not in self._iu_store:
if (self._resend_active and
not type_ == ipaaca.ipaaca_pb2.IURetraction):
# send resend request to remote server, IURetraction is ignored
try:
self._request_remote_resend(event)
except ipaaca.exception.IUResendRequestFailedError:
LOGGER.warning('Requesting resend for IU {} failed.'.
format(event.data.uid))
else:
LOGGER.warning("Received an update for an IU which we did not receive before.")
return
# an update to an existing IU
if type_ == ipaaca.ipaaca_pb2.IURetraction:
# IU retraction (cannot be triggered remotely)
iu = self._iu_store[event.data.uid]
iu._revision = event.data.revision
iu._apply_retraction() # for now - just sets the _rectracted flag.
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.RETRACTED, category=iu.category)
else:
if event.data.writer_name == self.unique_name:
# Notify only for remotely triggered events;
# Discard updates that originate from this buffer
return
if type_ == ipaaca.ipaaca_pb2.IUCommission:
# IU commit
iu = self._iu_store[event.data.uid]
iu._apply_commission()
iu._revision = event.data.revision
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.COMMITTED, category=iu.category)
elif type_ == ipaaca.converter.IUPayloadUpdate:
# IU payload update
iu = self._iu_store[event.data.uid]
iu._apply_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.UPDATED, category=iu.category)
elif type_ == ipaaca.converter.IULinkUpdate:
# IU link update
iu = self._iu_store[event.data.uid]
iu._apply_link_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.LINKSUPDATED, category=iu.category)
else:
LOGGER.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_))
def add_category_interests(self, category_interests):
if not isinstance(category_interests, six.string_types) and hasattr(category_interests, '__iter__'):
for interest in category_interests:
self._add_category_listener(interest)
else:
self._add_category_listener(category_interests)
def remove_category_interests(self, category_interests):
if not isinstance(category_interests, six.string_types) and hasattr(category_interests, '__iter__'):
for interest in category_interests:
self._remove_category_listener(interest)
else:
self._remove_category_listener(category_interests)
def _request_remote_resend(self, event):
remote_server = self._get_remote_server(event)
if remote_server:
resend_request = ipaaca.ipaaca_pb2.IUResendRequest()
resend_request.uid = event.data.uid # target iu
resend_request.hidden_scope_name = str(self._uuid) # hidden category name
remote_revision = remote_server.resendRequest(resend_request)
if remote_revision == 0:
raise ipaaca.exception.IUResendRequestFailedError(event.data.uid)
else:
# Remote server is not known
raise ipaaca.exception.IUResendRequestRemoteServerUnknownError(event.data.uid)
def register_handler(self, handler_function, for_event_types=None, for_categories=None):
"""Register a new IU event handler function.
Keyword arguments:
handler_function -- a function with the signature (IU, event_type, local)
for_event_types -- a list of event types or None if handler should
be called for all event types
for_categories -- a list of category names or None if handler should
be called for all categories
"""
handler = super(InputBuffer, self).register_handler(handler_function, for_event_types, for_categories)
try:
for category in handler._for_categories:
self.add_category_interests(category)
except TypeError:
# i.e., None was provided to the handler
pass
return handler
def is_resend_active(self):
return self._resend_active
def set_resend_active(self, active=True):
self._resend_active = active
class OutputBuffer(Buffer):
"""An OutputBuffer that holds local IUs."""
@auto_teardown_instances
def __init__(self, owning_component_name, channel=None, participant_config=None):
'''Create an OutputBuffer.
Keyword arguments:
owning_component_name -- name of the entity that own this buffer
participant_config -- RSB configuration
'''
super(OutputBuffer, self).__init__(owning_component_name, channel, participant_config)
self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB'
be = ipaaca.backend.get_default_backend()
self._server = be.createLocalServer(self, be.Scope(self._unique_name + '/Server'), config=self._participant_config)
self._informer_store = {}
self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-'
self.__iu_id_counter_lock = threading.Lock()
def _teardown(self):
'''OutputBuffer retracts remaining live IUs on teardown'''
self._retract_all_internal()
self._deactivate_all_internal()
def __del__(self):
'''Perform teardown (IU retractions) as soon as Buffer is lost.
Note that at program exit the teardown might be called
twice for live objects (atexit, then del), but the
_retract_all_internal method prevents double retractions.'''
self._retract_all_internal()
self._deactivate_all_internal()
def _remote_update_links(self, update):
'''Apply a remotely requested update to one of the stored IU's links.'''
if update.uid not in self._iu_store:
LOGGER.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
return 0
iu = self._iu_store[update.uid]
with iu.revision_lock:
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
LOGGER.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
return 0
if update.is_delta:
iu.modify_links(add=update.new_links, remove=update.links_to_remove, writer_name=update.writer_name)
else:
iu.set_links(links=update.new_links, writer_name=update.writer_name)
self.call_iu_event_handlers(update.uid, local=True, event_type=ipaaca.iu.IUEventType.LINKSUPDATED, category=iu.category)
return iu.revision
def _remote_update_payload(self, update):
'''Apply a remotely requested update to one of the stored IU's payload.'''
if update.uid not in self._iu_store:
LOGGER.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
return 0
iu = self._iu_store[update.uid]
with iu.revision_lock:
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
LOGGER.warning("Remote update_payload operation failed because request was out of date; IU "+str(update.uid))
LOGGER.warning(" Writer was: "+update.writer_name)
LOGGER.warning(" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove))
LOGGER.warning(" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision))
return 0
if update.is_delta:
#print('Writing delta update by '+str(update.writer_name))
with iu.payload:
for k in update.keys_to_remove:
iu.payload.__delitem__(k, writer_name=update.writer_name)
for k,v in update.new_items.items():
iu.payload.__setitem__(k, v, writer_name=update.writer_name)
else:
#print('Writing non-incr update by '+str(update.writer_name))
iu._set_payload(update.new_items, writer_name=update.writer_name)
# _set_payload etc. have also incremented the revision number
self.call_iu_event_handlers(update.uid, local=True, event_type=ipaaca.iu.IUEventType.UPDATED, category=iu.category)
return iu.revision
def _remote_request_resend(self, iu_resend_request_pack):
''' Resend a requested IU over the specific hidden category.'''
if iu_resend_request_pack.uid not in self._iu_store:
LOGGER.warning("Remote side requested resending of non-existent IU "+str(iu_resend_request_pack.uid))
return 0
iu = self._iu_store[iu_resend_request_pack.uid]
with iu.revision_lock:
if iu_resend_request_pack.hidden_scope_name is not None and iu_resend_request_pack.hidden_scope_name != '':
informer = self._get_informer(iu_resend_request_pack.hidden_scope_name)
informer.publishData(iu)
return iu.revision
else:
return 0
def _remote_commit(self, iu_commission):
'''Apply a remotely requested commit to one of the stored IUs.'''
if iu_commission.uid not in self._iu_store:
LOGGER.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid))
return 0
iu = self._iu_store[iu_commission.uid]
with iu.revision_lock:
if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
LOGGER.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
return 0
if iu.committed:
return 0
else:
iu._internal_commit(writer_name=iu_commission.writer_name)
self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=ipaaca.iu.IUEventType.COMMITTED, category=iu.category)
return iu.revision
def _get_informer(self, iu_category):
'''Return (or create, store and return) an informer object for IUs of the specified category.'''
if iu_category in self._informer_store:
LOGGER.info("Returning informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category))
return self._informer_store[iu_category]
be = ipaaca.backend.get_default_backend()
informer_iu = be.createInformer(
be.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)),
config=self._participant_config,
dataType=object)
self._informer_store[iu_category] = informer_iu #new_tuple
LOGGER.info("Returning NEW informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category))
return informer_iu #return new_tuple
def add(self, iu):
'''Add an IU to the IU store, assign an ID and publish it.'''
if iu.uid in self._iu_store:
raise ipaaca.exception.IUPublishedError(iu)
if iu.buffer is not None:
raise ipaaca.exception.IUPublishedError(iu)
if iu.retracted:
raise ipaaca.exception.IURetractedError(iu)
if iu.access_mode != ipaaca.iu.IUAccessMode.MESSAGE:
# Messages are not really stored in the OutputBuffer
self._iu_store[iu.uid] = iu
iu.buffer = self
self._publish_iu(iu)
def remove(self, iu=None, iu_uid=None):
'''Retracts an IU and removes it from the OutputBuffer.'''
if iu is None:
if iu_uid is None:
return None
else:
if iu_uid not in self._iu_store:
raise ipaaca.exception.IUNotFoundError(iu_uid)
iu = self._iu_store[iu_uid]
# unpublish the IU
self._retract_iu(iu)
del self._iu_store[iu.uid]
return iu
def _publish_iu(self, iu):
'''Publish an IU.'''
informer = self._get_informer(iu._category)
informer.publishData(iu)
def _retract_iu(self, iu):
'''Retract an IU.'''
iu._retracted = True
iu_retraction = ipaaca.ipaaca_pb2.IURetraction()
iu_retraction.uid = iu.uid
iu_retraction.revision = iu.revision
informer = self._get_informer(iu._category)
informer.publishData(iu_retraction)
def _retract_all_internal(self):
'''Retract all IUs without removal (for Buffer teardown).'''
for iu in self._iu_store.values():
if not iu._retracted:
self._retract_iu(iu)
def _deactivate_all_internal(self):
'''Deactivate all participants.'''
try:
self._server.deactivate()
except RuntimeError:
# Is raised if an already deactivated participant is
# deactivated again
pass
for informer in self._informer_store.values():
try:
informer.deactivate()
except RuntimeError:
# Is raised if an already deactivated participant is
# deactivated again
pass
def _send_iu_commission(self, iu, writer_name):
'''Send IU commission.
Keyword arguments:
iu -- the IU that has been committed to
writer_name -- name of the Buffer that initiated this commit, necessary
to enable remote components to filter out updates that originated
from their own operations
'''
# a raw Protobuf object for IUCommission is produced
# (unlike updates, where we have an intermediate class)
iu_commission = ipaaca.ipaaca_pb2.IUCommission()
iu_commission.uid = iu.uid
iu_commission.revision = iu.revision
iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name
informer = self._get_informer(iu._category)
informer.publishData(iu_commission)
def _send_iu_link_update(self, iu, is_delta, revision, new_links=None, links_to_remove=None, writer_name="undef"):
'''Send an IU link update.
Keyword arguments:
iu -- the IU being updated
is_delta -- whether this is an incremental update or a replacement
the whole link dictionary
revision -- the new revision number
new_links -- a dictionary of new link sets
links_to_remove -- a dict of the link sets that shall be removed
writer_name -- name of the Buffer that initiated this update, necessary
to enable remote components to filter out updates that originate d
from their own operations
'''
if new_links is None:
new_links = {}
if links_to_remove is None:
links_to_remove = {}
link_update = ipaaca.converter.IULinkUpdate(iu._uid, is_delta=is_delta, revision=revision)
link_update.new_links = new_links
if is_delta:
link_update.links_to_remove = links_to_remove
link_update.writer_name = writer_name
informer = self._get_informer(iu._category)
informer.publishData(link_update)
# FIXME send the notification to the target, if the target is not the writer_name
def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"):
'''Send an IU payload update.
Keyword arguments:
iu -- the IU being updated
is_delta -- whether this is an incremental update or a replacement
revision -- the new revision number
new_items -- a dictionary of new payload items
keys_to_remove -- a list of the keys that shall be removed from the
payload
writer_name -- name of the Buffer that initiated this update, necessary
to enable remote components to filter out updates that originate d
from their own operations
'''
if new_items is None:
new_items = {}
if keys_to_remove is None:
keys_to_remove = []
payload_update = ipaaca.converter.IUPayloadUpdate(
uid=iu._uid,
revision=revision,
is_delta=is_delta,
payload_type=iu.payload_type)
payload_update.new_items = new_items
if is_delta:
payload_update.keys_to_remove = keys_to_remove
payload_update.writer_name = writer_name
informer = self._get_informer(iu._category)
informer.publishData(payload_update)
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import os
import re
try:
import configparser
except:
import ConfigParser as configparser
LOGGER = ipaaca.misc.get_library_logger()
__global_config = None
class Config(object):
def __init__(self):
self._store = {}
def get_with_default(self, key, default_value, warn=False):
if key in self._store:
return self._store[key]
else:
notif = LOGGER.warning if warn else LOGGER.debug
notif('Config key '+str(key)+' not found, returning default of '+str(default_value))
return default_value
def populate_from_global_sources(self):
self._store = {}
self.populate_from_any_conf_files()
self.populate_from_environment()
#self.populate_from_argv_overrides() # TODO IMPLEMENT_ME
def populate_from_any_conf_files(self):
globalconf = os.getenv('HOME', '')+'/.config/ipaaca.conf'
for filename in ['ipaaca.conf', globalconf]:
try:
f = open(filename, 'r')
c = configparser.ConfigParser()
c.readfp(f)
f.close()
LOGGER.info('Including configuration from '+filename)
for k,v in c.items('ipaaca'):
self._store[k] = v
return
except:
pass
LOGGER.info('Could not load ipaaca.conf either here or in ~/.config')
def populate_from_environment(self):
for k, v in os.environ.items():
if k.startswith('IPAACA_'):
if re.match(r'^[A-Za-z0-9_]*$', k) is None:
LOGGER.warning('Ignoring malformed environment key')
else:
if len(v)>1023:
LOGGER.warning('Ignoring long environment value')
else:
# remove initial IPAACA_ and transform key to dotted lowercase
trans_key = k[7:].lower().replace('_', '.')
self._store[trans_key] = v
LOGGER.debug('Configured from environment: '+str(trans_key)+'="'+str(v)+'"')
def get_global_config():
global __global_config
if __global_config is None:
__global_config = Config()
__global_config.populate_from_global_sources()
return __global_config
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import collections
#import rsb.converter
import ipaaca.ipaaca_pb2
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
LOGGER = ipaaca.misc.get_library_logger()
try:
import simplejson as json
except ImportError:
import json
LOGGER.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.')
__all__ = [
'IntConverter',
'IUConverter',
'IULinkUpdate',
'IULinkUpdateConverter',
'IUPayloadUpdate',
'IUPayloadUpdateConverter',
'MessageConverter',
'register_global_converter',
]
_LOW_LEVEL_WIRE_SCHEMA_MAP = None
def LOW_LEVEL_WIRE_SCHEMA_FOR(abstractname):
'''Map the abstract wire schema name (was used in RSB) to a
transport-dependent magic to detect on the wire.
Here: a required protobuf field'''
global _LOW_LEVEL_WIRE_SCHEMA_MAP
if _LOW_LEVEL_WIRE_SCHEMA_MAP is None:
_LOW_LEVEL_WIRE_SCHEMA_MAP = {
int: ipaaca.ipaaca_pb2.WireTypeIntMessage,
ipaaca.iu.IU: ipaaca.ipaaca_pb2.WireTypeIU,
ipaaca.iu.Message: ipaaca.ipaaca_pb2.WireTypeMessageIU,
IUPayloadUpdate: ipaaca.ipaaca_pb2.WireTypeIUPayloadUpdate,
IULinkUpdate: ipaaca.ipaaca_pb2.WireTypeIULinkUpdate,
'int': ipaaca.ipaaca_pb2.WireTypeIntMessage,
'ipaaca-iu': ipaaca.ipaaca_pb2.WireTypeIU,
'ipaaca-messageiu': ipaaca.ipaaca_pb2.WireTypeMessageIU,
'ipaaca-iu-payload-update': ipaaca.ipaaca_pb2.WireTypeIUPayloadUpdate,
'ipaaca-iu-link-update': ipaaca.ipaaca_pb2.WireTypeIULinkUpdate,
}
return _LOW_LEVEL_WIRE_SCHEMA_MAP.get(abstractname)
def __fail_no_type_converter():
raise ipaaca.exception.BackendSerializationError()
class FailingDict(dict):
def __init__(self, error_class, *args, **kwargs):
super(FailingDict, self).__init__(*args, **kwargs)
self._error_class = error_class
def __getitem__(self, k):
if k in self:
return dict.__getitem__(self, k)
else:
raise self._error_class(k)
# global converter / [un]marshaller store
__converter_registry_by_type = FailingDict(ipaaca.exception.BackendSerializationError)
__converter_registry_by_wire_schema = FailingDict(ipaaca.exception.BackendDeserializationError)
def register_global_converter(converter):
global __converter_registry_by_type, __converter_registry_by_wire_schema
real_wire_schema = LOW_LEVEL_WIRE_SCHEMA_FOR(converter._wire_schema)
if real_wire_schema is None:
raise NotImplementedError('There is no entry in the _LOW_LEVEL_WIRE_SCHEMA_MAP for '+str(converter._wire_schema))
if real_wire_schema in __converter_registry_by_wire_schema:
raise ipaaca.exception.ConverterRegistrationError(real_wire_schema)
if converter._data_type in __converter_registry_by_type:
raise ipaaca.exception.ConverterRegistrationError(converter._data_type.__name__)
__converter_registry_by_type[converter._data_type] = converter
__converter_registry_by_wire_schema[real_wire_schema] = converter
def deserialize(lowlevel_message):
pbo_outer = ipaaca.ipaaca_pb2.TransportLevelWrapper()
pbo_outer.ParseFromString(lowlevel_message)
type_ = pbo_outer.transport_message_type
#print('Received wire message type', type_)
if type_ in __converter_registry_by_wire_schema:
return __converter_registry_by_wire_schema[type_].deserialize(pbo_outer.raw_message, None)
else:
pbo = None
if type_ == ipaaca.ipaaca_pb2.WireTypeRemoteRequestResult:
pbo = ipaaca.ipaaca_pb2.RemoteRequestResult()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIURetraction:
pbo = ipaaca.ipaaca_pb2.IURetraction()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIUCommission:
pbo = ipaaca.ipaaca_pb2.IUCommission()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIUResendRequest:
pbo = ipaaca.ipaaca_pb2.IUResendRequest()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIUPayloadUpdateRequest:
pbo = ipaaca.ipaaca_pb2.IUPayloadUpdateRequest()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIUCommissionRequest:
pbo = ipaaca.ipaaca_pb2.IUCommissionRequest()
elif type_ == ipaaca.ipaaca_pb2.WireTypeIULinkUpdateRequest:
pbo = ipaaca.ipaaca_pb2.IULinkUpdateRequest()
if pbo is None:
raise ipaaca.exception.BackendDeserializationError(type_)
else:
pbo.ParseFromString(pbo_outer.raw_message)
return pbo
raise ipaaca.exception.BackendDeserializationError(type_)
def serialize(obj):
inner, type_ = None, None
if obj.__class__ in __converter_registry_by_type:
cls_ = obj.__class__
inner, wire = __converter_registry_by_type[obj.__class__].serialize(obj)
type_ = LOW_LEVEL_WIRE_SCHEMA_FOR(wire)
else:
cls_ = obj.__class__
if cls_ == ipaaca.ipaaca_pb2.RemoteRequestResult:
type_ = ipaaca.ipaaca_pb2.WireTypeRemoteRequestResult
elif cls_ == ipaaca.ipaaca_pb2.IURetraction:
type_ = ipaaca.ipaaca_pb2.WireTypeIURetraction
elif cls_ == ipaaca.ipaaca_pb2.IUCommission:
type_ = ipaaca.ipaaca_pb2.WireTypeIUCommission
elif cls_ == ipaaca.ipaaca_pb2.IUResendRequest:
type_ = ipaaca.ipaaca_pb2.WireTypeIUResendRequest
elif cls_ == ipaaca.ipaaca_pb2.IUPayloadUpdateRequest:
type_ = ipaaca.ipaaca_pb2.WireTypeIUPayloadUpdateRequest
elif cls_ == ipaaca.ipaaca_pb2.IUCommissionRequest:
type_ = ipaaca.ipaaca_pb2.WireTypeIUCommissionRequest
elif cls_ == ipaaca.ipaaca_pb2.IULinkUpdateRequest:
type_ = ipaaca.ipaaca_pb2.WireTypeIULinkUpdateRequest
if type_ is None:
raise ipaaca.exception.BackendSerializationError(cls_)
else:
inner = obj.SerializeToString()
pbo = ipaaca.ipaaca_pb2.TransportLevelWrapper()
pbo.transport_message_type = type_
pbo.raw_message = inner
return bytearray(pbo.SerializeToString())
class ConverterBase(object):
'''Base for converters (to serialize and unserialize
data automatically depending on its Python type).'''
def __init__(self, substrate, data_type, wire_schema):
self._substrate = substrate
self._wire_schema = wire_schema
self._data_type = data_type
self.wireSchema = wire_schema # added compat with RSB
#print('Made a ConverterBase with wire '+str(self._wire_schema)+' and data '+str(self._data_type))
def serialize(self, value):
raise NotImplementedError('NOT IMPLEMENTED for ' \
+ self.__class__.__name__+': serialize')
def deserialize(self, stream, _UNUSED_override_wire_schema):
raise NotImplementedError('NOT IMPLEMENTED for ' \
+ self.__class__.__name__+': deserialize')
class IntConverter(ConverterBase):
"""Convert Python int objects to Protobuf ints and vice versa."""
def __init__(self, wireSchema="int", dataType=None):
super(IntConverter, self).__init__(bytearray, int, wireSchema)
def serialize(self, value):
pbo = ipaaca.ipaaca_pb2.IntMessage()
pbo.value = value
return pbo.SerializeToString(), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IntMessage()
pbo.ParseFromString(byte_stream)
return pbo.value
def pack_payload_entry(entry, key, value, _type=None):
#if _type is None: _type=ipaaca.iu.IUPayloadType.JSON
entry.key = key
if _type is None or _type == ipaaca.iu.IUPayloadType.JSON:
entry.value = json.dumps(value)
elif _type == ipaaca.iu.IUPayloadType.STR or _type == 'MAP':
entry.value = str(value)
else:
raise ipaaca.exception.IpaacaException('Asked to send payload entry with unsupported type "' + _type + '".')
entry.type = _type
def unpack_payload_entry(entry):
# We assume that the only transfer types are 'STR' or 'JSON'. Both are transparently handled by json.loads
if entry.type == ipaaca.iu.IUPayloadType.JSON:
return json.loads(entry.value)
elif entry.type == ipaaca.iu.IUPayloadType.STR or entry.type == 'str':
return entry.value
else:
LOGGER.warn('Received payload entry with unsupported type "' + entry.type + '".')
return entry.value
class IUConverter(ConverterBase):
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-iu", dataType=None): #ipaaca.iu.IU):
super(IUConverter, self).__init__(bytearray, ipaaca.iu.IU if dataType is None else dataType, wireSchema)
self._access_mode = ipaaca.ipaaca_pb2.IU.PUSH
self._remote_data_type = ipaaca.iu.RemotePushIU
def serialize(self, iu):
pbo = ipaaca.ipaaca_pb2.IU()
pbo.access_mode = self._access_mode
pbo.uid = iu._uid
pbo.revision = iu._revision
pbo.category = iu._category
pbo.payload_type = iu._payload_type
pbo.owner_name = iu._owner_name
pbo.committed = iu._committed
pbo.read_only = iu._read_only
for k, v in iu._payload.items():
entry = pbo.payload.add()
pack_payload_entry(entry, k, v, iu.payload_type)
for type_ in iu._links.keys():
linkset = pbo.links.add()
linkset.type = type_
linkset.targets.extend(iu._links[type_])
return pbo.SerializeToString(), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IU()
pbo.ParseFromString(byte_stream)
_payload = {}
for entry in pbo.payload:
_payload[entry.key] = unpack_payload_entry(entry)
_links = collections.defaultdict(set)
for linkset in pbo.links:
for target_uid in linkset.targets:
_links[linkset.type].add(target_uid)
return self._remote_data_type(
uid=pbo.uid,
revision=pbo.revision,
read_only = pbo.read_only,
owner_name = pbo.owner_name,
category = pbo.category,
payload_type = 'str' if pbo.payload_type == 'MAP' else pbo.payload_type,
committed = pbo.committed,
payload=_payload,
links=_links)
class MessageConverter(IUConverter):
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-messageiu", dataType=None): #ipaaca.iu.Message):
super(MessageConverter, self).__init__(wireSchema, ipaaca.iu.Message)
self._access_mode = ipaaca.ipaaca_pb2.IU.MESSAGE
self._remote_data_type = ipaaca.iu.RemoteMessage
class IULinkUpdate(object):
def __init__(self, uid, revision, is_delta, writer_name="undef", new_links=None, links_to_remove=None, request_uid=None, request_endpoint=None):
super(IULinkUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.writer_name = writer_name
self.is_delta = is_delta
self.new_links = collections.defaultdict(set) if new_links is None else collections.defaultdict(set, new_links)
self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove)
self.request_uid = request_uid
self.request_endpoint = request_endpoint
def __str__(self):
s = 'LinkUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_links = '+str(self.new_links)+', '
s += 'links_to_remove = '+str(self.links_to_remove)+')'
return s
class IULinkUpdateConverter(ConverterBase):
def __init__(self, wireSchema="ipaaca-iu-link-update", dataType=None): #=IULinkUpdate):
super(IULinkUpdateConverter, self).__init__(bytearray, IULinkUpdate, wireSchema)
def serialize(self, iu_link_update):
pbo = ipaaca.ipaaca_pb2.IULinkUpdate()
pbo.uid = iu_link_update.uid
pbo.writer_name = iu_link_update.writer_name
pbo.revision = iu_link_update.revision
if iu_link_update.request_uid:
pbo.request_uid = iu_link_update.request_uid
if iu_link_update.request_endpoint:
pbo.request_endpoint = iu_link_update.request_endpoint
for type_ in iu_link_update.new_links.keys():
linkset = pbo.new_links.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.new_links[type_])
for type_ in iu_link_update.links_to_remove.keys():
linkset = pbo.links_to_remove.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.links_to_remove[type_])
pbo.is_delta = iu_link_update.is_delta
return pbo.SerializeToString(), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IULinkUpdate()
pbo.ParseFromString(byte_stream)
LOGGER.debug('received an IULinkUpdate for revision '+str(pbo.revision))
iu_link_up = IULinkUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta, request_uid=pbo.request_uid, request_endpoint=pbo.request_endpoint)
for entry in pbo.new_links:
iu_link_up.new_links[str(entry.type)] = set(entry.targets)
for entry in pbo.links_to_remove:
iu_link_up.links_to_remove[str(entry.type)] = set(entry.targets)
return iu_link_up
class IUPayloadUpdate(object):
def __init__(self, uid, revision, is_delta, payload_type, writer_name="undef", new_items=None, keys_to_remove=None, request_uid=None, request_endpoint=None):
super(IUPayloadUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.payload_type = payload_type
self.writer_name = writer_name
self.is_delta = is_delta
self.new_items = {} if new_items is None else new_items
self.keys_to_remove = [] if keys_to_remove is None else keys_to_remove
self.request_uid = request_uid
self.request_endpoint = request_endpoint
def __str__(self):
s = 'PayloadUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'payload_type='+str(self.payload_type)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_items = '+str(self.new_items)+', '
s += 'keys_to_remove = '+str(self.keys_to_remove)+')'
return s
class IUPayloadUpdateConverter(ConverterBase):
def __init__(self, wireSchema="ipaaca-iu-payload-update", dataType=None):
super(IUPayloadUpdateConverter, self).__init__(bytearray, IUPayloadUpdate, wireSchema)
def serialize(self, iu_payload_update):
pbo = ipaaca.ipaaca_pb2.IUPayloadUpdate()
pbo.uid = iu_payload_update.uid
pbo.writer_name = iu_payload_update.writer_name
pbo.revision = iu_payload_update.revision
if iu_payload_update.request_uid:
pbo.request_uid = iu_payload_update.request_uid
if iu_payload_update.request_endpoint:
pbo.request_endpoint = iu_payload_update.request_endpoint
for k, v in iu_payload_update.new_items.items():
entry = pbo.new_items.add()
pack_payload_entry(entry, k, v, iu_payload_update.payload_type)
pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove)
pbo.is_delta = iu_payload_update.is_delta
return pbo.SerializeToString(), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca.ipaaca_pb2.IUPayloadUpdate()
pbo.ParseFromString(byte_stream)
LOGGER.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, payload_type=None, writer_name=pbo.writer_name, is_delta=pbo.is_delta, request_uid=pbo.request_uid, request_endpoint=pbo.request_endpoint)
for entry in pbo.new_items:
iu_up.new_items[entry.key] = unpack_payload_entry(entry)
iu_up.keys_to_remove = pbo.keys_to_remove[:]
return iu_up
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
IPAACA_DEFAULT_CHANNEL = 'default'
IPAACA_LOGGER_NAME = 'ipaaca'
IPAACA_DEFAULT_LOGGING_LEVEL = 'WARNING'
IPAACA_DEFAULT_IU_PAYLOAD_TYPE = 'JSON' # one of ipaaca.iu.IUPayloadType
IPAACA_DEFAULT_RSB_HOST = None
IPAACA_DEFAULT_RSB_PORT = None
IPAACA_DEFAULT_RSB_TRANSPORT = None
IPAACA_DEFAULT_RSB_SOCKET_SERVER = None