Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • scs/ipaaca
  • ramin.yaghoubzadeh/ipaaca
2 results
Show changes
Showing
with 1925 additions and 106 deletions
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2016 Social Cognitive Systems Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca.util;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
import java.util.HashMap;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
public class IpaacaLogger {
private static OutputBuffer ob;
private static final Object lock = new Object();
private static boolean SEND_IPAACA_LOGS = true;
private static String MODULE_NAME = "???";
private static void initializeOutBuffer() {
synchronized (lock) {
if (ob == null) {
ob = new OutputBuffer("LogSender");
}
}
}
public static void setModuleName(String name) {
synchronized (lock) {
MODULE_NAME = name;
}
}
public static void setLogFileName(String fileName, String logMode) {
initializeOutBuffer();
LocalMessageIU msg = new LocalMessageIU("log");
HashMap<String, String> pl = new HashMap<String, String>();
pl.put("cmd", "open_log_file");
pl.put("filename", fileName);
if (logMode != null) {
if (logMode.equals("append") ||
logMode.equals("overwrite") ||
logMode.equals("timestamp")) {
pl.put("existing", logMode);
} else {
return;
}
}
ob.add(msg);
}
public static void sendIpaacaLogs(boolean flag) {
synchronized (lock) {
SEND_IPAACA_LOGS = flag;
}
}
private static void logConsole(String level, String text, float now, String function, String thread) {
for(String line: text.split("\n")) {
System.out.println("[" + level + "] " + thread + " " + function + " " + line);
function = StringUtils.leftPad("", function.length(), ' ');
thread = StringUtils.leftPad("", thread.length(), ' ');
}
}
private static void logIpaaca(String level, String text, float now, String function, String thread) {
initializeOutBuffer();
LocalMessageIU msg = new LocalMessageIU("log");
HashMap<String, String> pl = new HashMap<String, String>();
pl.put("module", MODULE_NAME);
pl.put("function", function);
pl.put("level", level);
pl.put("time", String.format("%.3f", now));
pl.put("thread", thread);
pl.put("uuid", UUID.randomUUID().toString());
pl.put("text", text);
msg.setPayload(pl);
ob.add(msg);
}
private static String getCallerName() {
String function = Thread.currentThread().getStackTrace()[3].getClassName();
function += "." + Thread.currentThread().getStackTrace()[3].getMethodName();
return function;
}
public static void logError(String msg) {
logError(msg, System.currentTimeMillis(), getCallerName());
}
public static void logError(String msg, float now) {
logError(msg, now, getCallerName());
}
private static void logError(String msg, float now, String callerName) {
String thread = Thread.currentThread().getName();
if (SEND_IPAACA_LOGS) {
logIpaaca("ERROR", msg, now, callerName, thread);
}
logConsole("ERROR", msg, now, callerName, thread);
}
public static void logWarn(String msg) {
logWarn(msg, System.currentTimeMillis(), getCallerName());
}
public static void logWarn(String msg, float now) {
logWarn(msg, now, getCallerName());
}
private static void logWarn(String msg, float now, String callerName) {
String thread = Thread.currentThread().getName();
if (SEND_IPAACA_LOGS) {
logIpaaca("WARN", msg, now, callerName, thread);
}
logConsole("WARN", msg, now, callerName, thread);
}
public static void logInfo(String msg) {
logInfo(msg, System.currentTimeMillis(), getCallerName());
}
public static void logInfo(String msg, float now) {
logInfo(msg, now, getCallerName());
}
private static void logInfo(String msg, float now, String callerName) {
String thread = Thread.currentThread().getName();
if (SEND_IPAACA_LOGS) {
logIpaaca("INFO", msg, now, callerName, thread);
}
logConsole("INFO", msg, now, callerName, thread);
}
public static void logDebug(String msg) {
logDebug(msg, System.currentTimeMillis(), getCallerName());
}
public static void logDebug(String msg, float now) {
logDebug(msg, now, getCallerName());
}
private static void logDebug(String msg, float now, String callerName) {
String thread = Thread.currentThread().getName();
if (SEND_IPAACA_LOGS) {
logIpaaca("DEBUG", msg, now, callerName, thread);
}
logConsole("DEBUG", msg, now, callerName, thread);
}
}
package ipaaca.util.communication;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableSet;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventHandler;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
/**
* Obtain a IU in the future. Usage:<br>
* FutureIU fu = FutureIU("componentx", "status", "started"); //wait for componentx to send a message that is it fully started<br>
* [Start componentx, assumes that component x will send a message or other iu with status=started in the payload]<br>
* AbstractIU iu = fu.take(); //get the actual IU
* @author hvanwelbergen
*/
public class FutureIU
{
private final InputBuffer inBuffer;
private final BlockingQueue<AbstractIU> queue = new ArrayBlockingQueue<AbstractIU>(1);
private final IUEventHandler handler;
public FutureIU(String category, final String idKey, final String idVal, InputBuffer inBuffer)
{
this.inBuffer = inBuffer;
handler = new IUEventHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
String id = iu.getPayload().get(idKey);
if (idVal.equals(id))
{
queue.offer(iu);
}
}
}, ImmutableSet.of(category));
inBuffer.registerHandler(handler);
}
public FutureIU(String category, String idKey, String idVal)
{
this(category, idKey, idVal, new InputBuffer("FutureIU", ImmutableSet.of(category)));
}
/**
* Closes the FutureIU, use only if get is not used.
*/
public void cleanup()
{
inBuffer.removeHandler(handler);
if (inBuffer.getOwningComponentName().equals("FutureIU"))
{
inBuffer.close();
}
}
/**
* Waits (if necessary) for the IU and take it (can be done only once)
*/
public AbstractIU take() throws InterruptedException
{
AbstractIU iu;
try
{
iu = queue.take();
}
finally
{
cleanup();
}
return iu;
}
/**
* Wait for at most the given time for the IU and take it (can be done only once), return null on timeout
*/
public AbstractIU take(long timeout, TimeUnit unit) throws InterruptedException
{
AbstractIU iu;
try
{
iu = queue.poll(timeout, unit);
}
finally
{
cleanup();
}
return iu;
}
}
package ipaaca.util.communication;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableSet;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
/**
* Obtain multiple future ius on an specific category. Usage:<br>
* FutureIUs futures = new FutureIUs(componentx, key);<br>
* [make componentx send a IU with key=keyvaluedesired1]<br>
* AbstractIU iu = futures.take(keyvaluedesired1);<br>
* [make componentx send a IU with key=keyvaluedesired2]
* AbstractIU iu = futures.take(keyvaluedesired2);<br>
* ...<br>
* futures.close();
* @author hvanwelbergen
*/
public class FutureIUs
{
private final InputBuffer inBuffer;
private final Map<String,BlockingQueue<AbstractIU>> resultsMap = Collections.synchronizedMap(new HashMap<String,BlockingQueue<AbstractIU>>());
public FutureIUs(String category, final String idKey)
{
inBuffer = new InputBuffer("FutureIUs", ImmutableSet.of(category));
inBuffer.registerHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
String id = iu.getPayload().get(idKey);
resultsMap.putIfAbsent(id, new ArrayBlockingQueue<AbstractIU>(1));
resultsMap.get(id).offer(iu);
}
}, ImmutableSet.of(category));
}
/**
* Waits (if necessary) for the IU and take it (can be done only once)
*/
public AbstractIU take(String idValue) throws InterruptedException
{
resultsMap.putIfAbsent(idValue, new ArrayBlockingQueue<AbstractIU>(1));
return resultsMap.get(idValue).take();
}
/**
* Wait for at most the given time for the IU and take it (can be done only once), return null on timeout
*/
public AbstractIU take(String idValue, long timeout, TimeUnit unit) throws InterruptedException
{
resultsMap.putIfAbsent(idValue, new ArrayBlockingQueue<AbstractIU>(1));
return resultsMap.get(idValue).poll(timeout, unit);
}
public void close()
{
inBuffer.close();
}
}
......@@ -183,7 +183,7 @@ public class ComponentPushCommunicationIntegrationTest
payloadUpdate.put("chunk12", "item2");
payloadUpdate.put("chunk13", "item3");
payloadUpdate.put("chunk14", "item4");
int oldRev = iuIn.getRevision();
long oldRev = iuIn.getRevision();
localIU.getPayload().merge(payloadUpdate);
Thread.sleep(200);
assertEquals(oldRev + 1, iuIn.getRevision());
......@@ -197,7 +197,7 @@ public class ComponentPushCommunicationIntegrationTest
payloadUpdate2.put("chunk22", "item6");
payloadUpdate2.put("chunk13", "item3-changed");
payloadUpdate2.put("chunk14", "item4-changed");
int oldRev2 = iuIn.getRevision();
long oldRev2 = iuIn.getRevision();
iuIn.getPayload().merge(payloadUpdate2);
Thread.sleep(200);
assertEquals(oldRev2 + 1, localIU.getRevision());
......
......@@ -54,7 +54,7 @@ public class InputBufferTest
iu.setOwnerName("owner");
iu.setReadOnly(false);
iu.setRevision(1);
informer.send(iu);
informer.publish(iu);
Thread.sleep(1000);
AbstractIU iuIn = inBuffer.getIU("uid1");
......
......@@ -205,7 +205,7 @@ public class JavaPythonTest
String pypr = PYTHON_PREAMBLE
+"ob = ipaaca.OutputBuffer('pythonside')\n"
+"iu = ipaaca.Message('JavaPythonTest')\n"
+"iu.payload['data'] = 'Hello from Python!'\n"
+"iu.payload = {'data':'Hello from Python!'}\n"
+"time.sleep(0.1)\n"
+"ob.add(iu)\n";
runPythonProgram(pypr);
......
package ipaaca.util;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import ipaaca.Initializer;
import org.junit.After;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
/**
* Integration tests for the blackboard
* @author hvanwelbergen
*/
public class BlackboardIntegrationTest
{
static
{
Initializer.initializeIpaacaRsb();
}
private Blackboard bb = new Blackboard("myblackboard","blackboardx");
private BlackboardClient bbc;
@After
public void after()
{
bb.close();
if(bbc!=null)
{
bbc.close();
}
}
@Test
public void testGetValueFromBlackboardBeforeConnection()
{
bb.put("key1","value1");
bbc = new BlackboardClient("myblackboardclient","blackboardx");
bbc.waitForBlackboardConnection();
assertEquals("value1", bbc.get("key1"));
}
@Test
public void testGetValueFromBlackboardAfterConnection() throws InterruptedException
{
bbc = new BlackboardClient("myblackboardclient","blackboardx");
bbc.waitForBlackboardConnection();
bb.put("key1","value1");
Thread.sleep(200);
assertEquals("value1", bbc.get("key1"));
}
@Test
public void testSetValueOnBlackboard() throws InterruptedException
{
bbc = new BlackboardClient("myblackboardclient","blackboardx");
bbc.waitForBlackboardConnection();
bbc.put("key2","value2");
Thread.sleep(300);
assertEquals("value2", bb.get("key2"));
}
@Test
public void testBlackboardUpdateHandler()throws InterruptedException
{
BlackboardUpdateListener mockListener = mock(BlackboardUpdateListener.class);
bb.addUpdateListener(mockListener);
bbc = new BlackboardClient("myblackboardclient","blackboardx");
bbc.waitForBlackboardConnection();
bbc.put("key2","value2");
Thread.sleep(200);
bb.put("key2","value3");
verify(mockListener,times(1)).update();
}
@Test
public void testBlackboardClientUpdateHandler() throws InterruptedException
{
BlackboardUpdateListener mockListener = mock(BlackboardUpdateListener.class);
bbc = new BlackboardClient("myblackboardclient","blackboardx");
bbc.waitForBlackboardConnection();
bbc.addUpdateListener(mockListener);
bb.put("key3","value3");
Thread.sleep(200);
bbc.put("key3","value4");
verify(mockListener,times(2)).update();
}
@Test
public void testSetManyValuesOnBlackboard() throws InterruptedException
{
bbc = new BlackboardClient("myblackboardclient","blackboardx");
bbc.waitForBlackboardConnection();
for(int i=0;i<100;i++)
{
bbc.put("key"+i,"value"+i);
bb.put("key"+i,"value"+i);
}
Thread.sleep(300);
assertEquals("value2", bb.get("key2"));
assertEquals("value3", bb.get("key3"));
}
@Test
public void testSetValuesOnClient() throws InterruptedException
{
bbc = new BlackboardClient("myblackboardclient","blackboardx");
bbc.waitForBlackboardConnection();
bbc.putAll(ImmutableMap.of("key1","value1","key2","value2"));
Thread.sleep(200);
assertEquals("value1", bb.get("key1"));
assertEquals("value2", bb.get("key2"));
}
@Test
public void testSetValuesOnBlackBoard() throws InterruptedException
{
bbc = new BlackboardClient("myblackboardclient","blackboardx");
bbc.waitForBlackboardConnection();
bb.putAll(ImmutableMap.of("key1","value1","key2","value2"));
Thread.sleep(200);
assertEquals("value1", bbc.get("key1"));
assertEquals("value2", bbc.get("key2"));
}
}
package ipaaca.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.Initializer;
import ipaaca.InputBuffer;
import ipaaca.LocalIU;
import ipaaca.OutputBuffer;
import ipaaca.util.ComponentNotifier;
import java.util.Set;
import lombok.Getter;
import org.junit.After;
import org.junit.Test;
import lombok.Getter;
import com.google.common.collect.ImmutableSet;
/**
......@@ -29,7 +30,7 @@ public class ComponentNotifierIntegrationTest
private ComponentNotifier notifier2;
private InputBuffer inBuffer;
private OutputBuffer outBuffer;
private static final String OTHER_CATEGORY="OTHER";
static
{
Initializer.initializeIpaacaRsb();
......@@ -67,6 +68,13 @@ public class ComponentNotifierIntegrationTest
return new ComponentNotifier(id, "test", ImmutableSet.copyOf(sendList), ImmutableSet.copyOf(recvList), outBuffer, inBuffer);
}
private ComponentNotifier setupCompNotifierWithOtherCategoryInputBuffer(String id, Set<String> sendList, Set<String> recvList)
{
inBuffer = new InputBuffer(id + "in", ImmutableSet.of(ComponentNotifier.NOTIFY_CATEGORY, OTHER_CATEGORY));
outBuffer = new OutputBuffer(id + "out");
return new ComponentNotifier(id, "test", ImmutableSet.copyOf(sendList), ImmutableSet.copyOf(recvList), outBuffer, inBuffer);
}
@Test
public void testSelf() throws InterruptedException
{
......@@ -97,4 +105,19 @@ public class ComponentNotifierIntegrationTest
assertEquals(1, h1.getNumCalled());
assertEquals(1, h2.getNumCalled());
}
@Test
public void testOtherCategoryInInputBuffer() throws InterruptedException
{
notifier1 = setupCompNotifierWithOtherCategoryInputBuffer("not1", ImmutableSet.of("a1", "b1"), ImmutableSet.of("a3", "b1"));
MyHandlerFunctor h1 = new MyHandlerFunctor();
notifier1.addNotificationHandler(h1);
OutputBuffer out = new OutputBuffer("out");
LocalIU iu = new LocalIU(OTHER_CATEGORY);
out.add(iu);
Thread.sleep(500);
assertEquals(0, h1.getNumCalled());
assertNotNull(inBuffer.getIU(iu.getUid()));
}
}
package ipaaca.util.communication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
/**
* Unit tests for the FutureIU
* @author hvanwelbergen
*
*/
public class FutureIUTest
{
private final OutputBuffer outBuffer = new OutputBuffer("component1");
@Test(timeout = 2000)
public void testSendBeforeTake() throws InterruptedException
{
FutureIU fu = new FutureIU("cat1", "status", "started");
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("status", "started");
outBuffer.add(message);
assertEquals(message.getPayload(), fu.take().getPayload());
}
@Test(timeout = 2000)
public void testSendAfterTake() throws InterruptedException
{
FutureIU fu = new FutureIU("cat1", "status", "started");
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("status", "started");
Runnable send = () -> {
try
{
Thread.sleep(1000);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
outBuffer.add(message);
};
new Thread(send).start();
assertEquals(message.getPayload(), fu.take().getPayload());
}
@Test
public void testInvalidKeyValue() throws InterruptedException
{
FutureIU fu = new FutureIU("cat1", "status", "started");
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("status", "cancelled");
assertNull(fu.take(1,TimeUnit.SECONDS));
}
}
package ipaaca.util.communication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Test;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
/**
* Unit tests for FutureIUs
* @author hvanwelbergen
*
*/
public class FutureIUsTest
{
private FutureIUs fus = new FutureIUs("cat1","id");
private final OutputBuffer outBuffer = new OutputBuffer("component1");
@After
public void cleanup()
{
fus.close();
}
@Test(timeout = 2000)
public void testSendBeforeTake() throws InterruptedException
{
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("id", "id1");
outBuffer.add(message);
assertEquals(message.getPayload(), fus.take("id1").getPayload());
}
@Test(timeout = 2000)
public void testSendAfterTake() throws InterruptedException
{
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("id", "id1");
Runnable send = () -> {
try
{
Thread.sleep(1000);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
outBuffer.add(message);
};
new Thread(send).start();
assertEquals(message.getPayload(), fus.take("id1").getPayload());
}
@Test
public void testNonMatchingKeyValue() throws InterruptedException
{
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("id", "id2");
outBuffer.add(message);
assertNull(fus.take("id1", 1,TimeUnit.SECONDS));
}
@Test
public void testMultipleKeyValues() throws InterruptedException
{
LocalMessageIU message1 = new LocalMessageIU("cat1");
message1.getPayload().put("id", "id1");
LocalMessageIU message2 = new LocalMessageIU("cat1");
message2.getPayload().put("id", "id2");
outBuffer.add(message2);
outBuffer.add(message1);
assertEquals(message1.getPayload(), fus.take("id1").getPayload());
assertEquals(message2.getPayload(), fus.take("id2").getPayload());
}
}
......@@ -2,7 +2,7 @@
// "Incremental Processing Architecture
// for Artificial Conversational Agents".
//
// Copyright (c) 2009-2014 Social Cognitive Systems Group
// Copyright (c) 2009-2022 Social Cognitive Systems Group
// CITEC, Bielefeld University
//
// http://opensource.cit-ec.de/projects/ipaaca/
......@@ -28,72 +28,150 @@
// Forschungsgemeinschaft (DFG) in the context of the German
// Excellence Initiative.
syntax = "proto2";
package ipaaca.protobuf;
enum TransportMessageType {
WireTypeRESERVED = 0;
WireTypeIntMessage = 1;
WireTypeRemoteRequestResult = 2;
WireTypeIU = 3;
WireTypeMessageIU = 4; // special case on the wire (use other converter)
WireTypeIUPayloadUpdate = 5;
WireTypeIULinkUpdate = 6;
WireTypeIURetraction = 7;
WireTypeIUCommission = 8;
WireTypeIUResendRequest = 9;
WireTypeIUPayloadUpdateRequest = 100;
WireTypeIUCommissionRequest = 101;
WireTypeIULinkUpdateRequest = 102;
}
message TransportLevelWrapper {
required TransportMessageType transport_message_type = 1;
required bytes raw_message = 2;
}
message IntMessage {
required sint32 value = 1;
required sint32 value = 1;
}
message LinkSet {
required string type = 1;
repeated string targets = 2;
required string type = 1;
repeated string targets = 2;
}
message PayloadItem {
required string key = 1;
required string value = 2;
required string type = 3 [default = "str"];
required string key = 1;
required string value = 2;
required string type = 3 [default = "str"];
}
message IU {
enum AccessMode {
PUSH = 0;
REMOTE = 1;
MESSAGE = 2;
}
required string uid = 1;
required uint32 revision = 2;
required string category = 3 [default = "undef"];
required string payload_type = 4 [default = "MAP"];
required string owner_name = 5;
required bool committed = 6 [default = false];
required AccessMode access_mode = 7 [default = PUSH];
required bool read_only = 8 [default = false];
repeated PayloadItem payload = 9;
repeated LinkSet links = 10;
enum AccessMode {
PUSH = 0;
REMOTE = 1;
MESSAGE = 2;
}
required string uid = 1;
required uint32 revision = 2;
required string category = 3 [default = "undef"];
required string payload_type = 4 [default = "MAP"];
required string owner_name = 5;
required bool committed = 6 [default = false];
required AccessMode access_mode = 7 [default = PUSH];
required bool read_only = 8 [default = false];
repeated PayloadItem payload = 9;
repeated LinkSet links = 10;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUPayloadUpdate {
required string uid = 1;
required uint32 revision = 2;
repeated PayloadItem new_items = 3;
repeated string keys_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
required string uid = 1;
required uint32 revision = 2;
repeated PayloadItem new_items = 3;
repeated string keys_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IURetraction {
required string uid = 1;
required uint32 revision = 2;
required string uid = 1;
required uint32 revision = 2;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUCommission {
required string uid = 1;
required uint32 revision = 2;
required string writer_name = 3;
required string uid = 1;
required uint32 revision = 2;
required string writer_name = 3;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IULinkUpdate {
required string uid = 1;
required uint32 revision = 2;
repeated LinkSet new_links = 3;
repeated LinkSet links_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUResendRequest {
required string uid = 1;
required string hidden_scope_name = 2;
required string uid = 1;
required string hidden_scope_name = 2;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IULinkUpdate {
required string uid = 1;
required uint32 revision = 2;
repeated LinkSet new_links = 3;
repeated LinkSet links_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
// Result for remote operations (below).
// Used to send a raw int, which was problematic.
// Usually: 0 = Failed, >0 = new revision of successfully modified resource.
message RemoteRequestResult {
required uint32 result = 1;
optional string request_uid = 100 [default = ""];
//optional string request_endpoint = 101 [default = ""];
}
// Remote / request versions of buffer setters:
// they just go with a dedicated ID
message IUPayloadUpdateRequest {
required string uid = 1;
required uint32 revision = 2;
repeated PayloadItem new_items = 3;
repeated string keys_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUCommissionRequest {
required string uid = 1;
required uint32 revision = 2;
required string writer_name = 3;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IULinkUpdateRequest {
required string uid = 1;
required uint32 revision = 2;
repeated LinkSet new_links = 3;
repeated LinkSet links_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
......@@ -7,7 +7,7 @@
<exec executable="protoc">
<arg value="--proto_path=../proto" />
<arg value="../proto/ipaaca.proto" />
<arg value="--python_out=build/" />
<arg value="--python_out=src/ipaaca/" />
</exec>
</target>
</project>
......
......@@ -5,8 +5,6 @@
</publications>
<dependencies>
<dependency org="google" name="protobuf-python" rev="latest.release"/>
<dependency org="rsb" name="rsb-python" rev="latest.release"/>
<dependency org="spread" name="spread" rev="latest.release"/>
</dependencies>
</ivy-module>
......
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Fri Sep 9 14:12:05 2016
@author: jpoeppel
"""
from setuptools import setup
import os
import sys
import subprocess
from os import path as op
from distutils.spawn import find_executable
from setuptools.command.build_py import build_py
from setuptools.command.bdist_egg import bdist_egg
from distutils.command.build import build
from distutils.command.sdist import sdist
class ProtoBuild(build_py):
"""
This command automatically compiles all .proto files with `protoc` compiler
and places generated files near them -- i.e. in the same directory.
"""
def find_protoc(self):
"Locates protoc executable"
if 'PROTOC' in os.environ and os.path.exists(os.environ['PROTOC']):
protoc = os.environ['PROTOC']
else:
protoc = find_executable('protoc')
if protoc is None:
sys.stderr.write('protoc not found. Is protobuf-compiler installed? \n'
'Alternatively, you can point the PROTOC environment variable at a local version.')
sys.exit(1)
return protoc
def run(self):
#TODO determine path automaticall
packagedir = "../proto"
print("running build proto")
for protofile in filter(lambda x: x.endswith('.proto'), os.listdir(packagedir)):
source = op.join(packagedir, protofile)
output = source.replace('.proto', '_pb2.py')
if (not op.exists(output) or (op.getmtime(source) > op.getmtime(output))):
sys.stderr.write('Protobuf-compiling ' + source + '\n')
subprocess.check_call([self.find_protoc(), "-I={}".format(packagedir),'--python_out=./src/ipaaca', source])
class BDist_egg(bdist_egg):
'''
Simple wrapper around the normal bdist_egg command to require
protobuf build before normal build.
.. codeauthor:: jwienke
'''
def run(self):
self.run_command('build_proto')
bdist_egg.run(self)
class Build(build):
'''
Simple wrapper around the normal build command to require protobuf build
before normal build.
.. codeauthor:: jwienke
'''
def run(self):
self.run_command('build_proto')
build.run(self)
class Sdist(sdist):
'''
Simple wrapper around the normal sdist command to require protobuf build
before generating the source distribution..
.. codeauthor:: jwienke
'''
def run(self):
# fetch the protocol before building the source distribution so that
# we have a cached version and each user can rebuild the protocol
# with his own protobuf version
self.run_command('build_proto')
sdist.run(self)
version = "0.1.3" #TODO determine correct version! ideally from git, maybe do something similar to rsb/setup.py
setup(name="ipaaca",
version=version,
author="Hendrik Buschmeier, Ramin Yaghoubzadeh, Sören Klett",
author_email="hbuschme@uni-bielefeld.de,ryaghoubzadeh@uni-bielefeld.de,sklett@techfak.uni-bielefeld.de",
license='LGPLv3+',
url='https://opensource.cit-ec.de/projects/ipaaca',
install_requires=["paho-mqtt", "six", "protobuf"],
packages=["ipaaca", "ipaaca.util"],
package_dir={"ipaaca":"src/ipaaca"},
# TODO Do we want to add ipaaca_pb2.py to the egg or as separate package?
# data_files=[("./ipaaca", ["ipaaca_pb2.py"])],
# dependency_links=[
# 'http://www.spread.org/files/'
# 'SpreadModule-1.5spread4.tgz#egg=SpreadModule-1.5spread4'],
cmdclass ={
"build_proto": ProtoBuild,
"sdist": Sdist,
"build": Build,
"bdist_egg":BDist_egg
}
)
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -36,7 +36,7 @@ import ipaaca
def remote_change_dumper(iu, event_type, local):
if local:
print 'remote side '+event_type+': '+str(iu)
print('remote side '+event_type+': '+str(iu))
ob = ipaaca.OutputBuffer('CoolInformerOut')
......
*_pb2.py
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2014 Social Cognitive Systems Group
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -32,61 +32,91 @@
from __future__ import division, print_function
import rsb
import rsb.converter
import os
import threading
from ipaaca.misc import logger, IpaacaArgumentParser
#import rsb
#import rsb.converter
import ipaaca_pb2
import ipaaca.ipaaca_pb2
import ipaaca.converter
from ipaaca.buffer import InputBuffer, OutputBuffer
from ipaaca.exception import *
from ipaaca.iu import IU, Message, IUAccessMode, IUEventType
from ipaaca.misc import enable_logging, IpaacaArgumentParser
from ipaaca.payload import Payload
import ipaaca.backend
def initialize_ipaaca_rsb():
rsb.converter.registerGlobalConverter(
ipaaca.converter.IntConverter(
wireSchema="int32",
dataType=int))
rsb.converter.registerGlobalConverter(
ipaaca.converter.IUConverter(
wireSchema="ipaaca-iu",
dataType=IU))
rsb.converter.registerGlobalConverter(
ipaaca.converter.MessageConverter(
wireSchema="ipaaca-messageiu",
dataType=Message))
rsb.converter.registerGlobalConverter(
ipaaca.converter.IULinkUpdateConverter(
wireSchema="ipaaca-iu-link-update",
dataType=converter.IULinkUpdate))
rsb.converter.registerGlobalConverter(
ipaaca.converter.IUPayloadUpdateConverter(
wireSchema="ipaaca-iu-payload-update",
dataType=converter.IUPayloadUpdate))
rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca_pb2.IUCommission))
rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca_pb2.IUResendRequest))
rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca_pb2.IURetraction))
rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()
## --- Module initialisation -------------------------------------------------
# register our own RSB Converters
initialize_ipaaca_rsb()
#
# ipaaca.exit(int_retval)
#
from ipaaca.buffer import atexit_cleanup_function
def exit(int_retval=0):
'''For the time being, this function can be used to
circumvent any sys.exit blocks, while at the same time
cleaning up the buffers (e.g. retracting IUs).
Call once at the end of any python script (or anywhere
in lieu of sys.exit() / os._exit(). '''
print('ipaaca: cleaning up and exiting with code '+str(int_retval))
atexit_cleanup_function()
os._exit(int_retval)
__RSB_INITIALIZER_LOCK = threading.Lock()
__RSB_INITIALIZED = False
def initialize_ipaaca_rsb_if_needed():
"""Initialise rsb if not yet initialise.
* Register own RSB converters.
* Initialise RSB from enviroment variables, rsb config file, or
from default values for RSB trnasport, host, and port (via
ipaaca.defaults or ipaaca.misc.IpaacaArgumentParser).
"""
global __RSB_INITIALIZED
with __RSB_INITIALIZER_LOCK:
if __RSB_INITIALIZED:
return
else:
ipaaca.converter.register_global_converter(
ipaaca.converter.IUConverter(
wireSchema="ipaaca-iu",
dataType=IU))
ipaaca.converter.register_global_converter(
ipaaca.converter.MessageConverter(
wireSchema="ipaaca-messageiu",
dataType=Message))
ipaaca.converter.register_global_converter(
ipaaca.converter.IULinkUpdateConverter(
wireSchema="ipaaca-iu-link-update",
dataType=converter.IULinkUpdate))
ipaaca.converter.register_global_converter(
ipaaca.converter.IUPayloadUpdateConverter(
wireSchema="ipaaca-iu-payload-update",
dataType=converter.IUPayloadUpdate))
if ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT is not None:
if ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT == 'spread':
os.environ['RSB_TRANSPORT_SPREAD_ENABLED'] = str(1)
os.environ['RSB_TRANSPORT_SOCKET_ENABLED'] = str(0)
elif ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT == 'socket':
os.environ['RSB_TRANSPORT_SPREAD_ENABLED'] = str(0)
os.environ['RSB_TRANSPORT_SOCKET_ENABLED'] = str(1)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_SOCKET_SERVER is not None:
os.environ['RSB_TRANSPORT_SOCKET_SERVER'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_SOCKET_SERVER)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST is not None:
os.environ['RSB_TRANSPORT_SPREAD_HOST'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST)
os.environ['RSB_TRANSPORT_SOCKET_HOST'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT is not None:
os.environ['RSB_TRANSPORT_SPREAD_PORT'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT)
os.environ['RSB_TRANSPORT_SOCKET_PORT'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT)
#
ipaaca.backend.register_backends()
__RSB_INITIALIZED = True
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import ipaaca.converter
import threading
import uuid
import os
import time
LOGGER = ipaaca.misc.get_library_logger()
__registered_backends = {}
__backend_registration_done = False
def register_backends():
global __registered_backends
global __backend_registration_done
if not __backend_registration_done:
__backend_registration_done = True
LOGGER.debug('Registering available back-ends')
# register available backends
# mqtt
import ipaaca.backend_mqtt
be = ipaaca.backend_mqtt.create_backend()
if be is not None:
__registered_backends[be.name] = be
LOGGER.debug('Back-end '+str(be.name)+' added')
# ros
import ipaaca.backend_ros
be = ipaaca.backend_ros.create_backend()
if be is not None:
__registered_backends[be.name] = be
LOGGER.debug('Back-end '+str(be.name)+' added')
def get_default_backend():
# TODO selection mechanism / config
if not __backend_registration_done:
register_backends()
if len(__registered_backends) == 0:
raise RuntimeError('No back-ends could be initialized for ipaaca-python')
cfg = ipaaca.config.get_global_config()
preferred = cfg.get_with_default('backend', None)
if preferred is None:
k = list(__registered_backends.keys())[0]
if len(__registered_backends) > 1:
LOGGER.warning('No preferred ipaaca.backend set, returning one of several (probably the first in list)')
print('Using randomly selected back-end {}!'.format(k))
else:
if preferred in __registered_backends:
k = preferred
else:
raise ipaaca.exception.BackendInitializationError(preferred)
LOGGER.info('Back-end is '+str(k))
return __registered_backends[k]
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import collections
import ipaaca.ipaaca_pb2
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import ipaaca.converter
import ipaaca.backend
import ipaaca.config
import threading
try:
import queue
except:
import Queue as queue
import uuid
import os
import time
try:
import paho.mqtt.client as mqtt
MQTT_ENABLED = True
except:
MQTT_ENABLED = False
if not MQTT_ENABLED:
def create_backend():
return None
else:
def create_backend():
return MQTTBackend(name='mqtt')
LOGGER = ipaaca.misc.get_library_logger()
_REMOTE_SERVER_MAX_QUEUED_REQUESTS = -1 # unlimited
_REMOTE_LISTENER_MAX_QUEUED_EVENTS = 1024 # 'Full' exception if exceeded
class EventWrapper(object):
def __init__(self, data):
self.data = data
class PendingRequest(object):
'''Encapsulation of a pending remote request with
a facility to keep the requesting thread locked
until the reply or a timeout unlocks it.'''
def __init__(self, request):
self._request = request
self._event = threading.Event()
self._reply = None
self._request_uid = str(uuid.uuid4())[0:8]
def wait_for_reply(self, timeout=30.0):
wr = self._event.wait(timeout)
return None if wr is False else self._reply
def reply_with_result(self, reply):
self._reply = reply
self._event.set()
class Informer(object):
'''Informer interface, wrapping an outbound port to MQTT'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
self._mqtt_client.connect(self._host, self._port)
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._live = True
self._live_event.set()
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
pass
def publishData(self, data):
#print('Informer publishing '+str(data.__class__.__name__)+' on '+self._scope)
self._mqtt_client.publish(self._scope, ipaaca.converter.serialize(data), qos=2)
class BackgroundEventDispatcher(threading.Thread):
def __init__(self, listener):
super(BackgroundEventDispatcher, self).__init__()
self.daemon = True
self._listener = listener
def terminate(self):
self._running = False
def run(self):
self._running = True
listener = self._listener
while self._running: # auto-terminated (daemon)
event = listener._event_queue.get(block=True, timeout=None)
if event is None: return # signaled termination
#print('\033[31mDispatch '+str(event.data.__class__.__name__)+' start ...\033[m')
for handler in self._listener._handlers:
handler(event)
#print('\033[32m... dispatch '+str(event.data.__class__.__name__)+' end.\033[m')
class Listener(object):
'''Listener interface, wrapping an inbound port from MQTT'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
self._event_queue = queue.Queue(_REMOTE_LISTENER_MAX_QUEUED_EVENTS)
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_socket_open = self.mqtt_callback_on_socket_open
#self._mqtt_client.on_socket_close = self.mqtt_callback_on_socket_close
#self._mqtt_client.on_log = self.mqtt_callback_on_log
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self._dispatcher = BackgroundEventDispatcher(self)
self._dispatcher.start()
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._event_queue.put(None, block=False) # signal termination, waking queue
self._dispatcher.terminate()
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
LOGGER.debug('Connect to '+str(self._host)+':'+str(self._port))
self._mqtt_client.connect(self._host, self._port)
#def mqtt_callback_on_log(self, client, userdata, level, buf):
# print('Listener: LOG: '+str(buf))
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._mqtt_client.subscribe(self._scope, qos=2)
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
self._live = True
self._live_event.set()
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
event = EventWrapper(ipaaca.converter.deserialize(message.payload))
self._event_queue.put(event, block=False) # queue event for BackgroundEventDispatcher
def addHandler(self, handler):
self._handlers.append(handler)
#def publishData(self, data):
# self._mqtt_client.publish(self._
class LocalServer(object):
'''LocalServer interface, allowing for RPC requests to
IU functions, or reporting back success or failure.'''
def __init__(self, buffer_impl, scope, config=None):
self._buffer = buffer_impl
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_LocalServer_' + self._uuid # unused atm
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
self._mqtt_client.connect(self._host, self._port)
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._mqtt_client.subscribe(self._scope, qos=2)
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
self._live = True
self._live_event.set()
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
req = ipaaca.converter.deserialize(message.payload)
result = None
if isinstance(req, ipaaca.converter.IUPayloadUpdate):
result = self.attempt_to_apply_remote_updatePayload(req)
elif isinstance(req, ipaaca.converter.IULinkUpdate):
result = self.attempt_to_apply_remote_updateLinks(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUCommission):
result = self.attempt_to_apply_remote_commit(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUResendRequest):
result = self.attempt_to_apply_remote_resendRequest(req)
else:
raise RuntimeError('LocalServer: got an object of wrong class '+str(req.__class__.__name__)) # TODO replace
if result is not None:
self.send_result_for_request(req, result)
#
def send_result_for_request(self, obj, result):
pbo = ipaaca.ipaaca_pb2.RemoteRequestResult()
pbo.result = result
pbo.request_uid = obj.request_uid
#print('Sending result to endpoint '+str(obj.request_endpoint))
self._mqtt_client.publish(obj.request_endpoint, ipaaca.converter.serialize(pbo), qos=2)
def attempt_to_apply_remote_updateLinks(self, obj):
return self._buffer._remote_update_links(obj)
def attempt_to_apply_remote_updatePayload(self, obj):
return self._buffer._remote_update_payload(obj)
def attempt_to_apply_remote_commit(self, obj):
return self._buffer._remote_commit(obj)
def attempt_to_apply_remote_resendRequest(self, obj):
return self._buffer._remote_request_resend(obj)
class RemoteServer(object):
'''RemoteServer, connects to a LocalServer on the side
of an actual IU owner, which will process any requests.
The RemoteServer is put on hold while the owner is
processing. RemoteServer is from RSB terminology,
it might more aptly be described as an RPC client.'''
def __init__(self, remote_end_scope, config=None):
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
#queue.Queue(_REMOTE_SERVER_MAX_QUEUED_REQUESTS)
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_RemoteServer_' + self._uuid
# will RECV here:
self._scope = '/ipaaca/remotes/' + self._name
# will SEND here
self._remote_end_scope = remote_end_scope
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + remote_end_scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
self._mqtt_client.connect(self._host, self._port)
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._mqtt_client.subscribe(self._scope, qos=2)
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
self._live = True
self._live_event.set()
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
reply = ipaaca.converter.deserialize(message.payload)
if isinstance(reply, ipaaca.ipaaca_pb2.RemoteRequestResult):
uid = reply.request_uid
pending_request = None
with self._pending_requests_lock:
if uid in self._pending_requests:
pending_request = self._pending_requests[uid]
del self._pending_requests[uid]
if pending_request is None:
raise RuntimeError('RemoteServer: got a reply for request uid that is not queued: '+str(uid))
else:
# provide result to other thread and unblock it
pending_request.reply_with_result(reply)
else:
raise RuntimeError('RemoteServer: got an object of wrong class '+str(reply.__class__.__name__)) # TODO replace
def queue_pending_request(self, request):
pending_request = PendingRequest(request)
with self._pending_requests_lock:
if _REMOTE_SERVER_MAX_QUEUED_REQUESTS>0 and len(self._pending_requests) >= _REMOTE_SERVER_MAX_QUEUED_REQUESTS:
raise RuntimeError('RemoteServer: maximum number of pending requests exceeded') # TODO replace?
else:
self._pending_requests[pending_request._request_uid] = pending_request
return pending_request
# impl
def blocking_call(self, request):
# Broker's queue will raise before sending anything if capacity is exceeded
pending_request = self.queue_pending_request(request)
# complete and send request
request.request_uid = pending_request._request_uid
request.request_endpoint = self._scope
self._mqtt_client.publish(self._remote_end_scope, ipaaca.converter.serialize(request), qos=2)
# wait for other end to return result
reply = pending_request.wait_for_reply()
if reply is None:
LOGGER.warning('A request timed out!')
return 0
else:
return reply.result # the actual int result
# glue that quacks like the RSB version
def resendRequest(self, req):
return self.blocking_call(req)
def commit(self, req):
return self.blocking_call(req)
def updatePayload(self, req):
return self.blocking_call(req)
def updateLinks(self, req):
return self.blocking_call(req)
class MQTTBackend(object):
def __init__(self, name='mqtt'):
# back-end initialization code
self._config = ipaaca.config.get_global_config()
self._name = name
self._participants = set()
def _get_name(self):
return self._name
name = property(_get_name)
def teardown(self):
LOGGER.info('MQTT teardown: waiting 1 sec for final deliveries')
time.sleep(1)
for p in self._participants:
p.deactivate_internal()
def Scope(self, scope_str):
'''Scope adapter (glue replacing rsb.Scope)'''
return str(scope_str)
def createLocalServer(self, buffer_impl, scope, config=None):
LOGGER.debug('Creating a LocalServer on '+str(scope))
s = LocalServer(buffer_impl, scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
def createRemoteServer(self, scope, config=None):
LOGGER.debug('Creating a RemoteServer on '+str(scope))
s = RemoteServer(scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
def createInformer(self, scope, config=None, dataType="ignored in this backend"):
LOGGER.debug('Creating an Informer on '+str(scope))
s = Informer(scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
def createListener(self, scope, config=None):
LOGGER.debug('Creating a Listener on '+str(scope))
s = Listener(scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import collections
import sys
import ipaaca.ipaaca_pb2
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import ipaaca.converter
import ipaaca.backend
import ipaaca.config as config
LOGGER = ipaaca.misc.get_library_logger()
ROS_ENABLED, __try_guessing = False, False
try:
import rospy
from std_msgs.msg import String
import base64
ROS_ENABLED = True
except:
LOGGER.debug('rospy or deps not found, ROS backend disabled')
ROS_ENABLED = False
if not ROS_ENABLED:
def create_backend():
return None
else:
def create_backend():
return ROSBackend(name='ros')
import threading
try:
import queue
except:
import Queue as queue
import uuid
import os
import time
import sys
class EventWrapper(object):
def __init__(self, data):
self.data = data
class PendingRequest(object):
'''Encapsulation of a pending remote request with
a facility to keep the requesting thread locked
until the reply or a timeout unlocks it.'''
def __init__(self, request):
self._request = request
self._event = threading.Event()
self._reply = None
self._request_uid = str(uuid.uuid4())[0:8]
def wait_for_reply(self, timeout=30.0):
wr = self._event.wait(timeout)
return None if wr is False else self._reply
def reply_with_result(self, reply):
self._reply = reply
self._event.set()
class Informer(object):
'''Informer interface, wrapping an outbound port to ROS'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._ros_pub = rospy.Publisher(self._scope, String, queue_size=100, tcp_nodelay=True, latch=True)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = config.get_with_default('transport.mqtt.port', 1883, warn=True)
def deactivate(self):
pass
#self._ros_pub.unregister()
#self._ros_pub = None
def publishData(self, data):
self._ros_pub.publish(ROSBackend.serialize(data))
class BackgroundEventDispatcher(threading.Thread):
def __init__(self, event, handlers):
super(BackgroundEventDispatcher, self).__init__()
self.daemon = True
self._event = event
self._handlers = handlers
def run(self):
for handler in self._handlers:
handler(self._event)
class Listener(object):
'''Listener interface, wrapping an inbound port from ROS'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._ros_sub = rospy.Subscriber(self._scope, String, self.on_message, tcp_nodelay=True)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = config.get_with_default('transport.mqtt.port', 1883, warn=True)
def deactivate(self):
pass
#self._ros_sub.unregister()
#self._ros_sub = None
def on_message(self, message):
event = EventWrapper(ROSBackend.deserialize(message.data))
## (1) with extra thread:
#dispatcher = BackgroundEventDispatcher(event, self._handlers)
#dispatcher.start()
## or (2) with no extra thread:
for handler in self._handlers:
handler(event)
def addHandler(self, handler):
self._handlers.append(handler)
class LocalServer(object):
'''LocalServer interface, allowing for RPC requests to
IU functions, or reporting back success or failure.'''
def __init__(self, buffer_impl, scope, config=None):
self._buffer = buffer_impl
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_LocalServer_' + self._uuid # unused atm
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._ros_pubs = {}
self._ros_sub = rospy.Subscriber(self._scope, String, self.on_message, tcp_nodelay=True)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = config.get_with_default('transport.mqtt.port', 1883, warn=True)
def get_publisher(self, endpoint):
if endpoint in self._ros_pubs:
return self._ros_pubs[endpoint]
else:
p = rospy.Publisher(endpoint, String, queue_size=10, tcp_nodelay=True, latch=True)
self._ros_pubs[endpoint] = p
return p
def deactivate(self):
pass
#self._ros_sub.unregister()
#for v in self._ros_pubs.values():
# v.unregister()
#self._ros_sub = None
#self._ros_pubs = {}
def on_message(self, message):
req = ROSBackend.deserialize(message.data)
result = None
if isinstance(req, ipaaca.converter.IUPayloadUpdate):
result = self.attempt_to_apply_remote_updatePayload(req)
elif isinstance(req, ipaaca.converter.IULinkUpdate):
result = self.attempt_to_apply_remote_updateLinks(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUCommission):
result = self.attempt_to_apply_remote_commit(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUResendRequest):
result = self.attempt_to_apply_remote_resendRequest(req)
else:
raise RuntimeError('LocalServer: got an object of wrong class '+str(req.__class__.__name__)) # TODO replace
if result is not None:
self.send_result_for_request(req, result)
#
def send_result_for_request(self, obj, result):
pbo = ipaaca.ipaaca_pb2.RemoteRequestResult()
pbo.result = result
pbo.request_uid = obj.request_uid
#print('Sending result to endpoint '+str(obj.request_endpoint))
pub = self.get_publisher(obj.request_endpoint)
pub.publish(ROSBackend.serialize(pbo))
def attempt_to_apply_remote_updateLinks(self, obj):
return self._buffer._remote_update_links(obj)
def attempt_to_apply_remote_updatePayload(self, obj):
return self._buffer._remote_update_payload(obj)
def attempt_to_apply_remote_commit(self, obj):
return self._buffer._remote_commit(obj)
def attempt_to_apply_remote_resendRequest(self, obj):
return self._buffer._remote_request_resend(obj)
_REMOTE_SERVER_MAX_QUEUED_REQUESTS = -1 # unlimited
class RemoteServer(object):
'''RemoteServer, connects to a LocalServer on the side
of an actual IU owner, which will process any requests.
The RemoteServer is put on hold while the owner is
processing. RemoteServer is from RSB terminology,
it might more aptly be described as an RPC client.'''
def __init__(self, remote_end_scope, config=None):
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
#queue.Queue(_REMOTE_SERVER_MAX_QUEUED_REQUESTS)
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_RemoteServer_' + self._uuid
# will RECV here:
self._scope = '/ipaaca/remotes/' + self._name
# will SEND here
self._remote_end_scope = remote_end_scope
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + remote_end_scope
self._ros_pub = rospy.Publisher(self._remote_end_scope, String, queue_size=10, tcp_nodelay=True, latch=True)
self._ros_sub = rospy.Subscriber(self._scope, String, self.on_message, tcp_nodelay=True)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = config.get_with_default('transport.mqtt.port', 1883, warn=True)
def deactivate(self):
pass
#self._ros_sub.unregister()
#self._ros_pub.unregister()
#self._ros_sub = None
#self._ros_pub = None
def on_message(self, message):
reply = ROSBackend.deserialize(message.data)
if isinstance(reply, ipaaca.ipaaca_pb2.RemoteRequestResult):
uid = reply.request_uid
pending_request = None
with self._pending_requests_lock:
if uid in self._pending_requests:
pending_request = self._pending_requests[uid]
del self._pending_requests[uid]
if pending_request is None:
raise RuntimeError('RemoteServer: got a reply for request uid that is not queued: '+str(uid))
else:
# provide result to other thread and unblock it
pending_request.reply_with_result(reply)
else:
raise RuntimeError('RemoteServer: got an object of wrong class '+str(reply.__class__.__name__)) # TODO replace
def queue_pending_request(self, request):
pending_request = PendingRequest(request)
with self._pending_requests_lock:
if _REMOTE_SERVER_MAX_QUEUED_REQUESTS>0 and len(self._pending_requests) >= _REMOTE_SERVER_MAX_QUEUED_REQUESTS:
raise RuntimeError('RemoteServer: maximum number of pending requests exceeded') # TODO replace?
else:
self._pending_requests[pending_request._request_uid] = pending_request
return pending_request
# impl
def blocking_call(self, request):
# Broker's queue will raise before sending anything if capacity is exceeded
pending_request = self.queue_pending_request(request)
# complete and send request
request.request_uid = pending_request._request_uid
request.request_endpoint = self._scope
self._ros_pub.publish(ROSBackend.serialize(request))
# wait for other end to return result
reply = pending_request.wait_for_reply()
if reply is None:
LOGGER.warning('A request timed out!')
return 0
else:
return reply.result # the actual int result
# glue that quacks like the RSB version
def resendRequest(self, req):
return self.blocking_call(req)
def commit(self, req):
return self.blocking_call(req)
def updatePayload(self, req):
return self.blocking_call(req)
def updateLinks(self, req):
return self.blocking_call(req)
class ROSBackend(object):
def __init__(self, name='ros'):
#import logging
# back-end initialization code
self._name = name
self._need_init = True
#logging.basicConfig(level=logging.DEBUG)
def init_once(self):
'''Actual back-end initialization is only done when it is used'''
if self._need_init:
self._need_init = False
self._config = config.get_global_config()
try:
# generate a ROS node prefix from the basename of argv[0]
clean_name = ''.join([c for c in sys.argv[0].rsplit('/',1)[-1].replace('.', '_').replace('-','_') if c.lower() in 'abcdefghijklmnoprqstuvwxzy0123456789_'])
except:
clean_name = ''
rospy.init_node('ipaaca_python' if len(clean_name)==0 else clean_name,
anonymous=True, disable_signals=True)
def _get_name(self):
return self._name
name = property(_get_name)
def teardown(self):
LOGGER.info('ROS teardown: waiting 1 sec for final deliveries')
time.sleep(1)
rospy.signal_shutdown('Done')
@staticmethod
def serialize(obj):
#print('object class: '+obj.__class__.__name__)
bb = ipaaca.converter.serialize(obj)
st = str(base64.b64encode(bb))
#print('serialized: '+str(st))
return st
@staticmethod
def deserialize(msg):
#print('got serialized: '+str(msg))
bb = base64.b64decode(msg)
return ipaaca.converter.deserialize(bb)
def Scope(self, scope_str):
'''Scope adapter (glue replacing rsb.Scope)'''
# ROS graph resources must not start with a slash
return str(scope_str)[1:] if scope_str.startswith('/') else str(scope_str)
def createLocalServer(self, buffer_impl, scope, config=None):
self.init_once()
LOGGER.debug('Creating a LocalServer on '+str(scope))
LOGGER.debug(' from thread '+threading.current_thread().name)
s = LocalServer(buffer_impl, scope, self._config if config is None else config)
#s._live_event.wait(30.0)
return s
def createRemoteServer(self, scope, config=None):
self.init_once()
LOGGER.debug('Creating a RemoteServer on '+str(scope))
LOGGER.debug(' from thread '+threading.current_thread().name)
s = RemoteServer(scope, self._config if config is None else config)
#s._live_event.wait(30.0)
return s
def createInformer(self, scope, config=None, dataType="ignored in this backend"):
self.init_once()
LOGGER.debug('Creating an Informer on '+str(scope))
LOGGER.debug(' from thread '+threading.current_thread().name)
s = Informer(scope, self._config if config is None else config)
#s._live_event.wait(30.0)
return s
def createListener(self, scope, config=None):
self.init_once()
LOGGER.debug('Creating a Listener on '+str(scope))
LOGGER.debug(' from thread '+threading.current_thread().name)
s = Listener(scope, self._config if config is None else config)
#s._live_event.wait(30.0)
return s