Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • scs/ipaaca
  • ramin.yaghoubzadeh/ipaaca
2 results
Show changes
Showing
with 1670 additions and 1641 deletions
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2016 Social Cognitive Systems Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca.util;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
import java.util.HashMap;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
public class IpaacaLogger {
private static OutputBuffer ob;
private static final Object lock = new Object();
private static boolean SEND_IPAACA_LOGS = true;
private static String MODULE_NAME = "???";
private static void initializeOutBuffer() {
synchronized (lock) {
if (ob == null) {
ob = new OutputBuffer("LogSender");
}
}
}
public static void setModuleName(String name) {
synchronized (lock) {
MODULE_NAME = name;
}
}
public static void setLogFileName(String fileName, String logMode) {
initializeOutBuffer();
LocalMessageIU msg = new LocalMessageIU("log");
HashMap<String, String> pl = new HashMap<String, String>();
pl.put("cmd", "open_log_file");
pl.put("filename", fileName);
if (logMode != null) {
if (logMode.equals("append") ||
logMode.equals("overwrite") ||
logMode.equals("timestamp")) {
pl.put("existing", logMode);
} else {
return;
}
}
ob.add(msg);
}
public static void sendIpaacaLogs(boolean flag) {
synchronized (lock) {
SEND_IPAACA_LOGS = flag;
}
}
private static void logConsole(String level, String text, float now, String function, String thread) {
for(String line: text.split("\n")) {
System.out.println("[" + level + "] " + thread + " " + function + " " + line);
function = StringUtils.leftPad("", function.length(), ' ');
thread = StringUtils.leftPad("", thread.length(), ' ');
}
}
private static void logIpaaca(String level, String text, float now, String function, String thread) {
initializeOutBuffer();
LocalMessageIU msg = new LocalMessageIU("log");
HashMap<String, String> pl = new HashMap<String, String>();
pl.put("module", MODULE_NAME);
pl.put("function", function);
pl.put("level", level);
pl.put("time", String.format("%.3f", now));
pl.put("thread", thread);
pl.put("uuid", UUID.randomUUID().toString());
pl.put("text", text);
msg.setPayload(pl);
ob.add(msg);
}
private static String getCallerName() {
String function = Thread.currentThread().getStackTrace()[3].getClassName();
function += "." + Thread.currentThread().getStackTrace()[3].getMethodName();
return function;
}
public static void logError(String msg) {
logError(msg, System.currentTimeMillis(), getCallerName());
}
public static void logError(String msg, float now) {
logError(msg, now, getCallerName());
}
private static void logError(String msg, float now, String callerName) {
String thread = Thread.currentThread().getName();
if (SEND_IPAACA_LOGS) {
logIpaaca("ERROR", msg, now, callerName, thread);
}
logConsole("ERROR", msg, now, callerName, thread);
}
public static void logWarn(String msg) {
logWarn(msg, System.currentTimeMillis(), getCallerName());
}
public static void logWarn(String msg, float now) {
logWarn(msg, now, getCallerName());
}
private static void logWarn(String msg, float now, String callerName) {
String thread = Thread.currentThread().getName();
if (SEND_IPAACA_LOGS) {
logIpaaca("WARN", msg, now, callerName, thread);
}
logConsole("WARN", msg, now, callerName, thread);
}
public static void logInfo(String msg) {
logInfo(msg, System.currentTimeMillis(), getCallerName());
}
public static void logInfo(String msg, float now) {
logInfo(msg, now, getCallerName());
}
private static void logInfo(String msg, float now, String callerName) {
String thread = Thread.currentThread().getName();
if (SEND_IPAACA_LOGS) {
logIpaaca("INFO", msg, now, callerName, thread);
}
logConsole("INFO", msg, now, callerName, thread);
}
public static void logDebug(String msg) {
logDebug(msg, System.currentTimeMillis(), getCallerName());
}
public static void logDebug(String msg, float now) {
logDebug(msg, now, getCallerName());
}
private static void logDebug(String msg, float now, String callerName) {
String thread = Thread.currentThread().getName();
if (SEND_IPAACA_LOGS) {
logIpaaca("DEBUG", msg, now, callerName, thread);
}
logConsole("DEBUG", msg, now, callerName, thread);
}
}
package ipaaca.util.communication;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableSet;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventHandler;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
/**
* Obtain a IU in the future. Usage:<br>
* FutureIU fu = FutureIU("componentx", "status", "started"); //wait for componentx to send a message that is it fully started<br>
* [Start componentx, assumes that component x will send a message or other iu with status=started in the payload]<br>
* AbstractIU iu = fu.take(); //get the actual IU
* @author hvanwelbergen
*/
public class FutureIU
{
private final InputBuffer inBuffer;
private final BlockingQueue<AbstractIU> queue = new ArrayBlockingQueue<AbstractIU>(1);
private final IUEventHandler handler;
public FutureIU(String category, final String idKey, final String idVal, InputBuffer inBuffer)
{
this.inBuffer = inBuffer;
handler = new IUEventHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
String id = iu.getPayload().get(idKey);
if (idVal.equals(id))
{
queue.offer(iu);
}
}
}, ImmutableSet.of(category));
inBuffer.registerHandler(handler);
}
public FutureIU(String category, String idKey, String idVal)
{
this(category, idKey, idVal, new InputBuffer("FutureIU", ImmutableSet.of(category)));
}
/**
* Closes the FutureIU, use only if get is not used.
*/
public void cleanup()
{
inBuffer.removeHandler(handler);
if (inBuffer.getOwningComponentName().equals("FutureIU"))
{
inBuffer.close();
}
}
/**
* Waits (if necessary) for the IU and take it (can be done only once)
*/
public AbstractIU take() throws InterruptedException
{
AbstractIU iu;
try
{
iu = queue.take();
}
finally
{
cleanup();
}
return iu;
}
/**
* Wait for at most the given time for the IU and take it (can be done only once), return null on timeout
*/
public AbstractIU take(long timeout, TimeUnit unit) throws InterruptedException
{
AbstractIU iu;
try
{
iu = queue.poll(timeout, unit);
}
finally
{
cleanup();
}
return iu;
}
}
package ipaaca.util.communication;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableSet;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
/**
* Obtain multiple future ius on an specific category. Usage:<br>
* FutureIUs futures = new FutureIUs(componentx, key);<br>
* [make componentx send a IU with key=keyvaluedesired1]<br>
* AbstractIU iu = futures.take(keyvaluedesired1);<br>
* [make componentx send a IU with key=keyvaluedesired2]
* AbstractIU iu = futures.take(keyvaluedesired2);<br>
* ...<br>
* futures.close();
* @author hvanwelbergen
*/
public class FutureIUs
{
private final InputBuffer inBuffer;
private final Map<String,BlockingQueue<AbstractIU>> resultsMap = Collections.synchronizedMap(new HashMap<String,BlockingQueue<AbstractIU>>());
public FutureIUs(String category, final String idKey)
{
inBuffer = new InputBuffer("FutureIUs", ImmutableSet.of(category));
inBuffer.registerHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
String id = iu.getPayload().get(idKey);
resultsMap.putIfAbsent(id, new ArrayBlockingQueue<AbstractIU>(1));
resultsMap.get(id).offer(iu);
}
}, ImmutableSet.of(category));
}
/**
* Waits (if necessary) for the IU and take it (can be done only once)
*/
public AbstractIU take(String idValue) throws InterruptedException
{
resultsMap.putIfAbsent(idValue, new ArrayBlockingQueue<AbstractIU>(1));
return resultsMap.get(idValue).take();
}
/**
* Wait for at most the given time for the IU and take it (can be done only once), return null on timeout
*/
public AbstractIU take(String idValue, long timeout, TimeUnit unit) throws InterruptedException
{
resultsMap.putIfAbsent(idValue, new ArrayBlockingQueue<AbstractIU>(1));
return resultsMap.get(idValue).poll(timeout, unit);
}
public void close()
{
inBuffer.close();
}
}
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2013 Sociable Agents Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaacademo;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventHandler;
import ipaaca.IUEventType;
import ipaaca.Initializer;
import ipaaca.InputBuffer;
import ipaaca.OutputBuffer;
import ipaaca.RemotePushIU;
import java.util.EnumSet;
import java.util.Set;
import javax.swing.JFrame;
import javax.swing.JLabel;
import com.google.common.collect.ImmutableSet;
public class TestListener
{
private static final class MyEventHandler implements HandlerFunctor
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
switch(type)
{
case ADDED: System.out.println("IU added "+iu.getPayload().get("CONTENT")); break;
case COMMITTED: System.out.println("IU committed"); break;
case UPDATED: System.out.println("IU updated "+iu.getPayload()); break;
case LINKSUPDATED: System.out.println("IU links updated"); break;
case RETRACTED: break;
case DELETED: break;
}
}
}
static
{
Initializer.initializeIpaacaRsb();
}
private static final String CATEGORY = "spam";
private static final double RATE = 0.5;
private UpdateThread updateThread;
public TestListener()
{
Set<String> categories = new ImmutableSet.Builder<String>().add(CATEGORY).build();
JLabel label = new JLabel("");
updateThread = new UpdateThread(new InputBuffer("TestListener", categories),label);
}
public void start()
{
updateThread.start();
}
private static class UpdateThread extends Thread
{
private InputBuffer inBuffer;
private JLabel label;
public UpdateThread(InputBuffer inBuffer, JLabel label)
{
this.inBuffer = inBuffer;
this.label = label;
EnumSet<IUEventType> types = EnumSet.of(IUEventType.ADDED,IUEventType.COMMITTED,IUEventType.UPDATED,IUEventType.LINKSUPDATED);
Set<String> categories = new ImmutableSet.Builder<String>().add(CATEGORY).build();
MyEventHandler printingEventHandler;
printingEventHandler = new MyEventHandler();
this.inBuffer.registerHandler(new IUEventHandler(printingEventHandler,types,categories));
}
@Override
public void run()
{
System.out.println("Starting!");
}
}
public static void main(String args[])
{
TestListener tl = new TestListener();
tl.start();
}
}
......@@ -222,7 +222,8 @@ public class TextPrinter
TextPrinter tp = new TextPrinter();
OutputBuffer outBuffer = new OutputBuffer("componentX");
//OutputBuffer outBuffer = new OutputBuffer("componentX");
/*String[] inputString = {"h","e","l","l","o"," ","w","o","r","l","d","!"};
LocalIU predIU = null;
for(String str:inputString)
......
......@@ -183,7 +183,7 @@ public class ComponentPushCommunicationIntegrationTest
payloadUpdate.put("chunk12", "item2");
payloadUpdate.put("chunk13", "item3");
payloadUpdate.put("chunk14", "item4");
int oldRev = iuIn.getRevision();
long oldRev = iuIn.getRevision();
localIU.getPayload().merge(payloadUpdate);
Thread.sleep(200);
assertEquals(oldRev + 1, iuIn.getRevision());
......@@ -197,7 +197,7 @@ public class ComponentPushCommunicationIntegrationTest
payloadUpdate2.put("chunk22", "item6");
payloadUpdate2.put("chunk13", "item3-changed");
payloadUpdate2.put("chunk14", "item4-changed");
int oldRev2 = iuIn.getRevision();
long oldRev2 = iuIn.getRevision();
iuIn.getPayload().merge(payloadUpdate2);
Thread.sleep(200);
assertEquals(oldRev2 + 1, localIU.getRevision());
......
......@@ -22,7 +22,7 @@ import com.google.common.collect.ImmutableSet;
public class InputBufferTest
{
private static final String COMPID = "Comp1";
private static final String CATEGORY = "category1";
private static final String CATEGORY = "testcat";
private InputBuffer inBuffer;
......@@ -47,14 +47,14 @@ public class InputBufferTest
@Test
public void testHandleRemotePushEvent() throws RSBException, InterruptedException
{
Informer<Object> informer = Factory.getInstance().createInformer("/ipaaca/category/"+CATEGORY);
Informer<Object> informer = Factory.getInstance().createInformer("/ipaaca/channel/default/category/"+CATEGORY);
informer.activate();
RemotePushIU iu = new RemotePushIU("uid1");
iu.setCategory("/ipaaca/category/"+CATEGORY);
iu.setCategory("/ipaaca/channel/default/category/"+CATEGORY);
iu.setOwnerName("owner");
iu.setReadOnly(false);
iu.setRevision(1);
informer.send(iu);
informer.publish(iu);
Thread.sleep(1000);
AbstractIU iuIn = inBuffer.getIU("uid1");
......
package ipaaca.util;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import ipaaca.Initializer;
import org.junit.After;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
/**
* Integration tests for the blackboard
* @author hvanwelbergen
*/
public class BlackboardIntegrationTest
{
static
{
Initializer.initializeIpaacaRsb();
}
private Blackboard bb = new Blackboard("myblackboard","blackboardx");
private BlackboardClient bbc;
@After
public void after()
{
bb.close();
if(bbc!=null)
{
bbc.close();
}
}
@Test
public void testGetValueFromBlackboardBeforeConnection()
{
bb.put("key1","value1");
bbc = new BlackboardClient("myblackboardclient","blackboardx");
bbc.waitForBlackboardConnection();
assertEquals("value1", bbc.get("key1"));
}
@Test
public void testGetValueFromBlackboardAfterConnection() throws InterruptedException
{
bbc = new BlackboardClient("myblackboardclient","blackboardx");
bbc.waitForBlackboardConnection();
bb.put("key1","value1");
Thread.sleep(200);
assertEquals("value1", bbc.get("key1"));
}
@Test
public void testSetValueOnBlackboard() throws InterruptedException
{
bbc = new BlackboardClient("myblackboardclient","blackboardx");
bbc.waitForBlackboardConnection();
bbc.put("key2","value2");
Thread.sleep(300);
assertEquals("value2", bb.get("key2"));
}
@Test
public void testBlackboardUpdateHandler()throws InterruptedException
{
BlackboardUpdateListener mockListener = mock(BlackboardUpdateListener.class);
bb.addUpdateListener(mockListener);
bbc = new BlackboardClient("myblackboardclient","blackboardx");
bbc.waitForBlackboardConnection();
bbc.put("key2","value2");
Thread.sleep(200);
bb.put("key2","value3");
verify(mockListener,times(1)).update();
}
@Test
public void testBlackboardClientUpdateHandler() throws InterruptedException
{
BlackboardUpdateListener mockListener = mock(BlackboardUpdateListener.class);
bbc = new BlackboardClient("myblackboardclient","blackboardx");
bbc.waitForBlackboardConnection();
bbc.addUpdateListener(mockListener);
bb.put("key3","value3");
Thread.sleep(200);
bbc.put("key3","value4");
verify(mockListener,times(2)).update();
}
@Test
public void testSetManyValuesOnBlackboard() throws InterruptedException
{
bbc = new BlackboardClient("myblackboardclient","blackboardx");
bbc.waitForBlackboardConnection();
for(int i=0;i<100;i++)
{
bbc.put("key"+i,"value"+i);
bb.put("key"+i,"value"+i);
}
Thread.sleep(300);
assertEquals("value2", bb.get("key2"));
assertEquals("value3", bb.get("key3"));
}
@Test
public void testSetValuesOnClient() throws InterruptedException
{
bbc = new BlackboardClient("myblackboardclient","blackboardx");
bbc.waitForBlackboardConnection();
bbc.putAll(ImmutableMap.of("key1","value1","key2","value2"));
Thread.sleep(200);
assertEquals("value1", bb.get("key1"));
assertEquals("value2", bb.get("key2"));
}
@Test
public void testSetValuesOnBlackBoard() throws InterruptedException
{
bbc = new BlackboardClient("myblackboardclient","blackboardx");
bbc.waitForBlackboardConnection();
bb.putAll(ImmutableMap.of("key1","value1","key2","value2"));
Thread.sleep(200);
assertEquals("value1", bbc.get("key1"));
assertEquals("value2", bbc.get("key2"));
}
}
package ipaaca.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.Initializer;
import ipaaca.InputBuffer;
import ipaaca.LocalIU;
import ipaaca.OutputBuffer;
import ipaaca.util.ComponentNotifier;
import java.util.Set;
import lombok.Getter;
import org.junit.After;
import org.junit.Test;
import lombok.Getter;
import com.google.common.collect.ImmutableSet;
/**
......@@ -29,7 +30,7 @@ public class ComponentNotifierIntegrationTest
private ComponentNotifier notifier2;
private InputBuffer inBuffer;
private OutputBuffer outBuffer;
private static final String OTHER_CATEGORY="OTHER";
static
{
Initializer.initializeIpaacaRsb();
......@@ -67,6 +68,13 @@ public class ComponentNotifierIntegrationTest
return new ComponentNotifier(id, "test", ImmutableSet.copyOf(sendList), ImmutableSet.copyOf(recvList), outBuffer, inBuffer);
}
private ComponentNotifier setupCompNotifierWithOtherCategoryInputBuffer(String id, Set<String> sendList, Set<String> recvList)
{
inBuffer = new InputBuffer(id + "in", ImmutableSet.of(ComponentNotifier.NOTIFY_CATEGORY, OTHER_CATEGORY));
outBuffer = new OutputBuffer(id + "out");
return new ComponentNotifier(id, "test", ImmutableSet.copyOf(sendList), ImmutableSet.copyOf(recvList), outBuffer, inBuffer);
}
@Test
public void testSelf() throws InterruptedException
{
......@@ -97,4 +105,19 @@ public class ComponentNotifierIntegrationTest
assertEquals(1, h1.getNumCalled());
assertEquals(1, h2.getNumCalled());
}
@Test
public void testOtherCategoryInInputBuffer() throws InterruptedException
{
notifier1 = setupCompNotifierWithOtherCategoryInputBuffer("not1", ImmutableSet.of("a1", "b1"), ImmutableSet.of("a3", "b1"));
MyHandlerFunctor h1 = new MyHandlerFunctor();
notifier1.addNotificationHandler(h1);
OutputBuffer out = new OutputBuffer("out");
LocalIU iu = new LocalIU(OTHER_CATEGORY);
out.add(iu);
Thread.sleep(500);
assertEquals(0, h1.getNumCalled());
assertNotNull(inBuffer.getIU(iu.getUid()));
}
}
package ipaaca.util.communication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
/**
* Unit tests for the FutureIU
* @author hvanwelbergen
*
*/
public class FutureIUTest
{
private final OutputBuffer outBuffer = new OutputBuffer("component1");
@Test(timeout = 2000)
public void testSendBeforeTake() throws InterruptedException
{
FutureIU fu = new FutureIU("cat1", "status", "started");
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("status", "started");
outBuffer.add(message);
assertEquals(message.getPayload(), fu.take().getPayload());
}
@Test(timeout = 2000)
public void testSendAfterTake() throws InterruptedException
{
FutureIU fu = new FutureIU("cat1", "status", "started");
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("status", "started");
Runnable send = () -> {
try
{
Thread.sleep(1000);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
outBuffer.add(message);
};
new Thread(send).start();
assertEquals(message.getPayload(), fu.take().getPayload());
}
@Test
public void testInvalidKeyValue() throws InterruptedException
{
FutureIU fu = new FutureIU("cat1", "status", "started");
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("status", "cancelled");
assertNull(fu.take(1,TimeUnit.SECONDS));
}
}
package ipaaca.util.communication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Test;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
/**
* Unit tests for FutureIUs
* @author hvanwelbergen
*
*/
public class FutureIUsTest
{
private FutureIUs fus = new FutureIUs("cat1","id");
private final OutputBuffer outBuffer = new OutputBuffer("component1");
@After
public void cleanup()
{
fus.close();
}
@Test(timeout = 2000)
public void testSendBeforeTake() throws InterruptedException
{
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("id", "id1");
outBuffer.add(message);
assertEquals(message.getPayload(), fus.take("id1").getPayload());
}
@Test(timeout = 2000)
public void testSendAfterTake() throws InterruptedException
{
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("id", "id1");
Runnable send = () -> {
try
{
Thread.sleep(1000);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
outBuffer.add(message);
};
new Thread(send).start();
assertEquals(message.getPayload(), fus.take("id1").getPayload());
}
@Test
public void testNonMatchingKeyValue() throws InterruptedException
{
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("id", "id2");
outBuffer.add(message);
assertNull(fus.take("id1", 1,TimeUnit.SECONDS));
}
@Test
public void testMultipleKeyValues() throws InterruptedException
{
LocalMessageIU message1 = new LocalMessageIU("cat1");
message1.getPayload().put("id", "id1");
LocalMessageIU message2 = new LocalMessageIU("cat1");
message2.getPayload().put("id", "id2");
outBuffer.add(message2);
outBuffer.add(message1);
assertEquals(message1.getPayload(), fus.take("id1").getPayload());
assertEquals(message2.getPayload(), fus.take("id2").getPayload());
}
}
......@@ -2,7 +2,7 @@
// "Incremental Processing Architecture
// for Artificial Conversational Agents".
//
// Copyright (c) 2009-2013 Sociable Agents Group
// Copyright (c) 2009-2022 Social Cognitive Systems Group
// CITEC, Bielefeld University
//
// http://opensource.cit-ec.de/projects/ipaaca/
......@@ -28,69 +28,150 @@
// Forschungsgemeinschaft (DFG) in the context of the German
// Excellence Initiative.
syntax = "proto2";
package ipaaca.protobuf;
enum TransportMessageType {
WireTypeRESERVED = 0;
WireTypeIntMessage = 1;
WireTypeRemoteRequestResult = 2;
WireTypeIU = 3;
WireTypeMessageIU = 4; // special case on the wire (use other converter)
WireTypeIUPayloadUpdate = 5;
WireTypeIULinkUpdate = 6;
WireTypeIURetraction = 7;
WireTypeIUCommission = 8;
WireTypeIUResendRequest = 9;
WireTypeIUPayloadUpdateRequest = 100;
WireTypeIUCommissionRequest = 101;
WireTypeIULinkUpdateRequest = 102;
}
message TransportLevelWrapper {
required TransportMessageType transport_message_type = 1;
required bytes raw_message = 2;
}
message IntMessage {
required sint32 value = 1;
required sint32 value = 1;
}
message LinkSet {
required string type = 1;
repeated string targets = 2;
required string type = 1;
repeated string targets = 2;
}
message PayloadItem {
required string key = 1;
required string value = 2;
required string type = 3 [default = "str"];
required string key = 1;
required string value = 2;
required string type = 3 [default = "str"];
}
message IU {
enum AccessMode {
PUSH = 0;
REMOTE = 1;
MESSAGE = 2;
}
required string uid = 1;
required uint32 revision = 2;
required string category = 3 [default = "undef"];
required string payload_type = 4 [default = "MAP"];
required string owner_name = 5;
required bool committed = 6 [default = false];
required AccessMode access_mode = 7 [default = PUSH];
required bool read_only = 8 [default = false];
repeated PayloadItem payload = 9;
repeated LinkSet links = 10;
enum AccessMode {
PUSH = 0;
REMOTE = 1;
MESSAGE = 2;
}
required string uid = 1;
required uint32 revision = 2;
required string category = 3 [default = "undef"];
required string payload_type = 4 [default = "MAP"];
required string owner_name = 5;
required bool committed = 6 [default = false];
required AccessMode access_mode = 7 [default = PUSH];
required bool read_only = 8 [default = false];
repeated PayloadItem payload = 9;
repeated LinkSet links = 10;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUPayloadUpdate {
required string uid = 1;
required uint32 revision = 2;
repeated PayloadItem new_items = 3;
repeated string keys_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
required string uid = 1;
required uint32 revision = 2;
repeated PayloadItem new_items = 3;
repeated string keys_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IURetraction {
required string uid = 1;
required uint32 revision = 2;
required string uid = 1;
required uint32 revision = 2;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUCommission {
required string uid = 1;
required uint32 revision = 2;
required string writer_name = 3;
required string uid = 1;
required uint32 revision = 2;
required string writer_name = 3;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IULinkUpdate {
required string uid = 1;
required uint32 revision = 2;
repeated LinkSet new_links = 3;
repeated LinkSet links_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
required string uid = 1;
required uint32 revision = 2;
repeated LinkSet new_links = 3;
repeated LinkSet links_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUResendRequest {
required string uid = 1;
required string hidden_scope_name = 2;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
// Result for remote operations (below).
// Used to send a raw int, which was problematic.
// Usually: 0 = Failed, >0 = new revision of successfully modified resource.
message RemoteRequestResult {
required uint32 result = 1;
optional string request_uid = 100 [default = ""];
//optional string request_endpoint = 101 [default = ""];
}
// Remote / request versions of buffer setters:
// they just go with a dedicated ID
message IUPayloadUpdateRequest {
required string uid = 1;
required uint32 revision = 2;
repeated PayloadItem new_items = 3;
repeated string keys_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUCommissionRequest {
required string uid = 1;
required uint32 revision = 2;
required string writer_name = 3;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IULinkUpdateRequest {
required string uid = 1;
required uint32 revision = 2;
repeated LinkSet new_links = 3;
repeated LinkSet links_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
......@@ -7,7 +7,7 @@
<exec executable="protoc">
<arg value="--proto_path=../proto" />
<arg value="../proto/ipaaca.proto" />
<arg value="--python_out=build/" />
<arg value="--python_out=src/ipaaca/" />
</exec>
</target>
</project>
......
......@@ -5,8 +5,6 @@
</publications>
<dependencies>
<dependency org="google" name="protobuf-python" rev="latest.release"/>
<dependency org="rsb" name="rsb-python" rev="latest.release"/>
<dependency org="spread" name="spread" rev="latest.release"/>
</dependencies>
</ivy-module>
......
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Fri Sep 9 14:12:05 2016
@author: jpoeppel
"""
from setuptools import setup
import os
import sys
import subprocess
from os import path as op
from distutils.spawn import find_executable
from setuptools.command.build_py import build_py
from setuptools.command.bdist_egg import bdist_egg
from distutils.command.build import build
from distutils.command.sdist import sdist
class ProtoBuild(build_py):
"""
This command automatically compiles all .proto files with `protoc` compiler
and places generated files near them -- i.e. in the same directory.
"""
def find_protoc(self):
"Locates protoc executable"
if 'PROTOC' in os.environ and os.path.exists(os.environ['PROTOC']):
protoc = os.environ['PROTOC']
else:
protoc = find_executable('protoc')
if protoc is None:
sys.stderr.write('protoc not found. Is protobuf-compiler installed? \n'
'Alternatively, you can point the PROTOC environment variable at a local version.')
sys.exit(1)
return protoc
def run(self):
#TODO determine path automaticall
packagedir = "../proto"
print("running build proto")
for protofile in filter(lambda x: x.endswith('.proto'), os.listdir(packagedir)):
source = op.join(packagedir, protofile)
output = source.replace('.proto', '_pb2.py')
if (not op.exists(output) or (op.getmtime(source) > op.getmtime(output))):
sys.stderr.write('Protobuf-compiling ' + source + '\n')
subprocess.check_call([self.find_protoc(), "-I={}".format(packagedir),'--python_out=./src/ipaaca', source])
class BDist_egg(bdist_egg):
'''
Simple wrapper around the normal bdist_egg command to require
protobuf build before normal build.
.. codeauthor:: jwienke
'''
def run(self):
self.run_command('build_proto')
bdist_egg.run(self)
class Build(build):
'''
Simple wrapper around the normal build command to require protobuf build
before normal build.
.. codeauthor:: jwienke
'''
def run(self):
self.run_command('build_proto')
build.run(self)
class Sdist(sdist):
'''
Simple wrapper around the normal sdist command to require protobuf build
before generating the source distribution..
.. codeauthor:: jwienke
'''
def run(self):
# fetch the protocol before building the source distribution so that
# we have a cached version and each user can rebuild the protocol
# with his own protobuf version
self.run_command('build_proto')
sdist.run(self)
version = "0.1.3" #TODO determine correct version! ideally from git, maybe do something similar to rsb/setup.py
setup(name="ipaaca",
version=version,
author="Hendrik Buschmeier, Ramin Yaghoubzadeh, Sören Klett",
author_email="hbuschme@uni-bielefeld.de,ryaghoubzadeh@uni-bielefeld.de,sklett@techfak.uni-bielefeld.de",
license='LGPLv3+',
url='https://opensource.cit-ec.de/projects/ipaaca',
install_requires=["paho-mqtt", "six", "protobuf"],
packages=["ipaaca", "ipaaca.util"],
package_dir={"ipaaca":"src/ipaaca"},
# TODO Do we want to add ipaaca_pb2.py to the egg or as separate package?
# data_files=[("./ipaaca", ["ipaaca_pb2.py"])],
# dependency_links=[
# 'http://www.spread.org/files/'
# 'SpreadModule-1.5spread4.tgz#egg=SpreadModule-1.5spread4'],
cmdclass ={
"build_proto": ProtoBuild,
"sdist": Sdist,
"build": Build,
"bdist_egg":BDist_egg
}
)
......@@ -4,7 +4,7 @@
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
......@@ -36,7 +36,7 @@ import ipaaca
def remote_change_dumper(iu, event_type, local):
if local:
print 'remote side '+event_type+': '+str(iu)
print('remote side '+event_type+': '+str(iu))
ob = ipaaca.OutputBuffer('CoolInformerOut')
......
*_pb2.py
......@@ -2,10 +2,10 @@
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2013 Sociable Agents Group
# CITEC, Bielefeld University
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
......@@ -22,7 +22,7 @@
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
......@@ -30,1590 +30,93 @@
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import print_function, division
from __future__ import division, print_function
import logging
import sys
import os
import threading
import uuid
import collections
import copy
import time
import rsb
import rsb.converter
#import rsb
#import rsb.converter
import ipaaca_pb2
import ipaaca.ipaaca_pb2
import ipaaca.converter
from ipaaca.buffer import InputBuffer, OutputBuffer
from ipaaca.exception import *
from ipaaca.iu import IU, Message, IUAccessMode, IUEventType
from ipaaca.misc import enable_logging, IpaacaArgumentParser
from ipaaca.payload import Payload
_DEFAULT_PAYLOAD_UPDATE_TIMEOUT = 0.1
import ipaaca.backend
# IDEAS
# We should think about relaying the update event (or at least the
# affected keys in the payload / links) to the event handlers!
# THOUGHTS
# Output buffers could generate UIDs for IUs on request, without
# publishing them at that time. Then UID could then be used
# for internal links etc. The IU may be published later through
# the same buffer that allocated the UID.
# WARNINGS
# category is now the FIRST argument for IU constructors
__all__ = [
'IUEventType',
'IUAccessMode',
'InputBuffer', 'OutputBuffer',
'IU',
'IUPublishedError', 'IUUpdateFailedError', 'IUCommittedError', 'IUReadOnlyError', 'IUNotFoundError',
'logger'
]
## --- Utilities -------------------------------------------------------------
def enum(*sequential, **named):
"""Create an enum type.
Based on suggestion of Alec Thomas on stackoverflow.com:
http://stackoverflow.com/questions/36932/
whats-the-best-way-to-implement-an-enum-in-python/1695250#1695250
"""
enums = dict(zip(sequential, range(len(sequential))), **named)
return type('Enum', (), enums)
def pack_typed_payload_item(protobuf_object, key, value):
protobuf_object.key = key
protobuf_object.value = value
protobuf_object.type = 'str' # TODO: more types
def unpack_typed_payload_item(protobuf_object):
# TODO: more types
return (protobuf_object.key, protobuf_object.value)
class IpaacaLoggingHandler(logging.Handler):
def __init__(self, level=logging.DEBUG):
logging.Handler.__init__(self, level)
def emit(self, record):
meta = '[ipaaca] (' + str(record.levelname) + ') '
msg = str(record.msg.format(record.args))
print(meta + msg)
## --- Global Definitions ----------------------------------------------------
IUEventType = enum(
ADDED = 'ADDED',
COMMITTED = 'COMMITTED',
DELETED = 'DELETED',
RETRACTED = 'RETRACTED',
UPDATED = 'UPDATED',
LINKSUPDATED = 'LINKSUPDATED',
MESSAGE = 'MESSAGE'
)
IUAccessMode = enum(
"PUSH",
"REMOTE",
"MESSAGE"
)
## --- Errors and Exceptions -------------------------------------------------
class IUPublishedError(Exception):
"""Error publishing of an IU failed since it is already in the buffer."""
def __init__(self, iu):
super(IUPublishedError, self).__init__('IU ' + str(iu.uid) + ' is already present in the output buffer.')
class IUUpdateFailedError(Exception):
"""Error indicating that a remote IU update failed."""
def __init__(self, iu):
super(IUUpdateFailedError, self).__init__('Remote update failed for IU ' + str(iu.uid) + '.')
class IUCommittedError(Exception):
"""Error indicating that an IU is immutable because it has been committed to."""
def __init__(self, iu):
super(IUCommittedError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it has been committed to.')
class IUReadOnlyError(Exception):
"""Error indicating that an IU is immutable because it is 'read only'."""
def __init__(self, iu):
super(IUReadOnlyError, self).__init__('Writing to IU ' + str(iu.uid) + ' failed -- it is read-only.')
class IUNotFoundError(Exception):
"""Error indicating that an IU UID was unexpectedly not found in an internal store."""
def __init__(self, iu_uid):
super(IUNotFoundError, self).__init__('Lookup of IU ' + str(iu_uid) + ' failed.')
class IUPayloadLockTimeoutError(Exception):
"""Error indicating that exclusive access to the Payload could not be obtained in time."""
def __init__(self, iu):
super(IUPayloadLockTimeoutError, self).__init__('Timeout while accessing payload of IU ' + str(iu.uid) + '.')
class IUPayloadLockedError(Exception):
"""Error indicating that exclusive access to the Payload could not be obtained because someone actually locked it."""
def __init__(self, iu):
super(IUPayloadLockedError, self).__init__('IU '+str(iu.uid)+' was locked during access attempt.')
## --- Generation Architecture -----------------------------------------------
class Payload(dict):
def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False, update_timeout=_DEFAULT_PAYLOAD_UPDATE_TIMEOUT):
self.iu = iu
pl1 = {} if new_payload is None else new_payload
pl = {}
for k,v in pl1.items():
if type(k)==str:
k=unicode(k,'utf8')
if type(v)==str:
v=unicode(v,'utf8')
pl[k] = v
# NOTE omit_init_update_message is necessary to prevent checking for
# exceptions and sending updates in the case where we just receive
# a whole new payload from the remote side and overwrite it locally.
for k, v in pl.items():
dict.__setitem__(self, k, v)
if (not omit_init_update_message) and (self.iu.buffer is not None):
self.iu._modify_payload(is_delta=False, new_items=pl, keys_to_remove=[], writer_name=writer_name)
self._update_on_every_change = True
self._collected_modifications = {}
self._collected_removals = []
self._update_timeout = update_timeout
self._batch_update_writer_name = None # name of remote buffer or None
self._batch_update_lock = threading.RLock()
self._batch_update_cond = threading.Condition(threading.RLock())
def merge(self, payload, writer_name=None):
self._batch_update_lock.acquire(True)
#if not self._batch_update_lock.acquire(False):
# print('Someone failed a lock trying to merge '+str(payload.keys()))
# raise IUPayloadLockedError(self.iu)
#print("Payload.merge() IN, Merging "+str(payload.keys()))
for k, v in payload.items():
if type(k)==str:
k=unicode(k,'utf8')
if type(v)==str:
v=unicode(v,'utf8')
self.iu._modify_payload(is_delta=True, new_items=payload, keys_to_remove=[], writer_name=writer_name)
r = dict.update(payload) # batch update
#print("Payload.merge() OUT")
self._batch_update_lock.release()
return r
def __setitem__(self, k, v, writer_name=None):
self._batch_update_lock.acquire(True)
#if not self._batch_update_lock.acquire(False):
# print('Someone failed a lock trying to set '+k+' to '+v)
# raise IUPayloadLockedError(self.iu)
#print("Payload.__setitem__() IN, Setting "+k+' to '+v)
#print(" by writer "+str(writer_name))
if type(k)==str:
k=unicode(k,'utf8')
if type(v)==str:
v=unicode(v,'utf8')
if self._update_on_every_change:
#print(" running _modify_payload with writer name "+str(writer_name))
self.iu._modify_payload(is_delta=True, new_items={k:v}, keys_to_remove=[], writer_name=writer_name)
else: # Collect additions/modifications
self._batch_update_writer_name = writer_name
self._collected_modifications[k] = v
r = dict.__setitem__(self, k, v)
#print("Payload.__setitem__() OUT")
self._batch_update_lock.release()
return r
def __delitem__(self, k, writer_name=None):
self._batch_update_lock.acquire(True)
#if not self._batch_update_lock.acquire(False):
# print('Someone failed a lock trying to del '+k)
# raise IUPayloadLockedError(self.iu)
#print("Payload.__delitem__() IN, Deleting "+k)
if type(k)==str:
k=unicode(k,'utf8')
if self._update_on_every_change:
self.iu._modify_payload(is_delta=True, new_items={}, keys_to_remove=[k], writer_name=writer_name)
else: # Collect additions/modifications
self._batch_update_writer_name = writer_name
self._collected_removals.append(k)
r = dict.__delitem__(self, k)
#print("Payload.__delitem__() OUT")
self._batch_update_lock.release()
return r
# Context-manager based batch updates, not yet thread-safe (on remote updates)
def __enter__(self):
#print('running Payload.__enter__()')
self._wait_batch_update_lock(self._update_timeout)
self._update_on_every_change = False
def __exit__(self, type, value, traceback):
#print('running Payload.__exit__()')
self.iu._modify_payload(is_delta=True, new_items=self._collected_modifications, keys_to_remove=self._collected_removals, writer_name=self._batch_update_writer_name)
self._collected_modifications = {}
self._collected_removals = []
self._update_on_every_change = True
self._batch_update_writer_name = None
self._batch_update_lock.release()
def _remotely_enforced_setitem(self, k, v):
"""Sets an item when requested remotely."""
return dict.__setitem__(self, k, v)
def _remotely_enforced_delitem(self, k):
"""Deletes an item when requested remotely."""
return dict.__delitem__(self, k)
def _wait_batch_update_lock(self, timeout):
# wait lock with time-out http://stackoverflow.com/a/8393033
with self._batch_update_cond:
current_time = start_time = time.time()
while current_time < start_time + timeout:
if self._batch_update_lock.acquire(False):
return True
else:
self._batch_update_cond.wait(timeout - current_time + start_time)
current_time = time.time()
raise IUPayloadLockTimeoutError(self.iu)
class IUInterface(object): #{{{
"""Base class of all specialised IU classes."""
def __init__(self, uid, access_mode=IUAccessMode.PUSH, read_only=False):
"""Creates an IU.
Keyword arguments:
uid -- unique ID of this IU
access_mode -- access mode of this IU
read_only -- flag indicating whether this IU is read_only or not
"""
self._uid = uid
self._revision = None
self._category = None
self._payload_type = None
self._owner_name = None
self._committed = False
self._retracted = False
self._access_mode = access_mode
self._read_only = read_only
self._buffer = None
# payload is not present here
self._links = collections.defaultdict(set)
def __str__(self):
s = unicode(self.__class__)+"{ "
s += "category="+("<None>" if self._category is None else self._category)+" "
s += "uid="+self._uid+" "
s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") "
s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " "
s += "payload={ "
for k,v in self.payload.items():
s += k+":'"+v+"', "
s += "} "
s += "links={ "
for t,ids in self.get_all_links().items():
s += t+":'"+str(ids)+"', "
s += "} "
s += "}"
return s
def _add_and_remove_links(self, add, remove):
'''Just add and remove the new links in our links set, do not send an update here'''
'''Note: Also used for remotely enforced links updates.'''
for type in remove.keys(): self._links[type] -= set(remove[type])
for type in add.keys(): self._links[type] |= set(add[type])
def _replace_links(self, links):
'''Just wipe and replace our links set, do not send an update here'''
'''Note: Also used for remotely enforced links updates.'''
self._links = collections.defaultdict(set)
for type in links.keys(): self._links[type] |= set(links[type])
def add_links(self, type, targets, writer_name=None):
'''Attempt to add links if the conditions are met
and send an update message. Then call the local setter.'''
if not hasattr(targets, '__iter__'): targets=[targets]
self._modify_links(is_delta=True, new_links={type:targets}, links_to_remove={}, writer_name=writer_name)
self._add_and_remove_links( add={type:targets}, remove={} )
def remove_links(self, type, targets, writer_name=None):
'''Attempt to remove links if the conditions are met
and send an update message. Then call the local setter.'''
if not hasattr(targets, '__iter__'): targets=[targets]
self._modify_links(is_delta=True, new_links={}, links_to_remove={type:targets}, writer_name=writer_name)
self._add_and_remove_links( add={}, remove={type:targets} )
def modify_links(self, add, remove, writer_name=None):
'''Attempt to modify links if the conditions are met
and send an update message. Then call the local setter.'''
self._modify_links(is_delta=True, new_links=add, links_to_remove=remove, writer_name=writer_name)
self._add_and_remove_links( add=add, remove=remove )
def set_links(self, links, writer_name=None):
'''Attempt to set (replace) links if the conditions are met
and send an update message. Then call the local setter.'''
self._modify_links(is_delta=False, new_links=links, links_to_remove={}, writer_name=writer_name)
self._replace_links( links=links )
def get_links(self, type):
return set(self._links[type])
def get_all_links(self):
return copy.deepcopy(self._links)
def _get_revision(self):
return self._revision
revision = property(fget=_get_revision, doc='Revision number of the IU.')
def _get_category(self):
return self._category
category = property(fget=_get_category, doc='Category of the IU.')
def _get_payload_type(self):
return self._payload_type
payload_type = property(fget=_get_payload_type, doc='Type of the IU payload')
def _get_committed(self):
return self._committed
committed = property(
fget=_get_committed,
doc='Flag indicating whether this IU has been committed to.')
def _get_retracted(self):
return self._retracted
retracted = property(
fget=_get_retracted,
doc='Flag indicating whether this IU has been retracted.')
def _get_uid(self):
return self._uid
uid = property(fget=_get_uid, doc='Unique ID of the IU.')
def _get_access_mode(self):
return self._access_mode
access_mode = property(fget=_get_access_mode, doc='Access mode of the IU.')
def _get_read_only(self):
return self._read_only
read_only = property(
fget=_get_read_only,
doc='Flag indicating whether this IU is read only.')
def _get_buffer(self):
return self._buffer
def _set_buffer(self, buffer):
if self._buffer is not None:
raise Exception('The IU is already in a buffer, cannot move it.')
self._buffer = buffer
buffer = property(
fget=_get_buffer,
fset=_set_buffer,
doc='Buffer this IU is held in.')
def _get_owner_name(self):
return self._owner_name
def _set_owner_name(self, owner_name):
if self._owner_name is not None:
raise Exception('The IU already has an owner name, cannot change it.')
self._owner_name = owner_name
owner_name = property(
fget=_get_owner_name,
fset=_set_owner_name,
doc="The IU's owner's name.")
#}}}
class IU(IUInterface):#{{{
"""A local IU."""
def __init__(self, category='undef', access_mode=IUAccessMode.PUSH, read_only=False, _payload_type='MAP'):
super(IU, self).__init__(uid=None, access_mode=access_mode, read_only=read_only)
self._revision = 1
self.uid = str(uuid.uuid4())
self._category = category
self._payload_type = _payload_type
self.revision_lock = threading.RLock()
self._payload = Payload(iu=self)
def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
if self.committed:
raise IUCommittedError(self)
with self.revision_lock:
# modify links locally
self._increase_revision_number()
if self.is_published:
# send update to remote holders
self.buffer._send_iu_link_update(
self,
revision=self.revision,
is_delta=is_delta,
new_links=new_links,
links_to_remove=links_to_remove,
writer_name=self.owner_name if writer_name is None else writer_name)
def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
"""Modify the payload: add or remove items from this payload locally and send update."""
if self.committed:
raise IUCommittedError(self)
with self.revision_lock:
# set item locally
# FIXME: Is it actually set locally?
self._increase_revision_number()
if self.is_published:
#print(' _modify_payload: running send_iu_pl_upd with writer name '+str(writer_name))
# send update to remote holders
self.buffer._send_iu_payload_update(
self,
revision=self.revision,
is_delta=is_delta,
new_items=new_items,
keys_to_remove=keys_to_remove,
writer_name=self.owner_name if writer_name is None else writer_name)
def _increase_revision_number(self):
self._revision += 1
def _internal_commit(self, writer_name=None):
if self.committed:
raise IUCommittedError(self)
with self.revision_lock:
if not self._committed:
self._increase_revision_number()
self._committed = True
if self.buffer is not None:
self.buffer._send_iu_commission(self, writer_name=writer_name)
def commit(self):
"""Commit to this IU."""
return self._internal_commit()
def _get_payload(self):
return self._payload
def _set_payload(self, new_pl, writer_name=None):
if self.committed:
raise IUCommittedError(self)
with self.revision_lock:
self._increase_revision_number()
self._payload = Payload(
iu=self,
writer_name=None if self.buffer is None else (self.buffer.unique_name if writer_name is None else writer_name),
new_payload=new_pl)
payload = property(
fget=_get_payload,
fset=_set_payload,
doc='Payload dictionary of this IU.')
def _get_is_published(self):
return self.buffer is not None
is_published = property(
fget=_get_is_published,
doc='Flag indicating whether this IU has been published or not.')
def _set_buffer(self, buffer):
if self._buffer is not None:
raise Exception('The IU is already in a buffer, cannot move it.')
self._buffer = buffer
self.owner_name = buffer.unique_name
self._payload.owner_name = buffer.unique_name
buffer = property(
fget=IUInterface._get_buffer,
fset=_set_buffer,
doc='Buffer this IU is held in.')
def _set_uid(self, uid):
if self._uid is not None:
raise AttributeError('The uid of IU ' + self.uid + ' has already been set, cannot change it.')
self._uid = uid
uid = property(
fget=IUInterface._get_uid,
fset=_set_uid,
doc='Unique ID of the IU.')
#}}}
class Message(IU):#{{{
"""Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls."""
def __init__(self, category='undef', access_mode=IUAccessMode.MESSAGE, read_only=True, _payload_type='MAP'):
super(Message, self).__init__(category=category, access_mode=access_mode, read_only=read_only, _payload_type=_payload_type)
def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
if self.is_published:
logger.info('Info: modifying a Message after sending has no global effects')
def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
if self.is_published:
logger.info('Info: modifying a Message after sending has no global effects')
def _increase_revision_number(self):
self._revision += 1
def _internal_commit(self, writer_name=None):
if self.is_published:
logger.info('Info: committing to a Message after sending has no global effects')
def commit(self):
return self._internal_commit()
def _get_payload(self):
return self._payload
def _set_payload(self, new_pl, writer_name=None):
if self.is_published:
logger.info('Info: modifying a Message after sending has no global effects')
else:
if self.committed:
raise IUCommittedError(self)
with self.revision_lock:
self._increase_revision_number()
self._payload = Payload(
iu=self,
writer_name=None if self.buffer is None else (self.buffer.unique_name if writer_name is None else writer_name),
new_payload=new_pl)
payload = property(
fget=_get_payload,
fset=_set_payload,
doc='Payload dictionary of this IU.')
def _get_is_published(self):
return self.buffer is not None
is_published = property(
fget=_get_is_published,
doc='Flag indicating whether this IU has been published or not.')
def _set_buffer(self, buffer):
if self._buffer is not None:
raise Exception('The IU is already in a buffer, cannot move it.')
self._buffer = buffer
self.owner_name = buffer.unique_name
self._payload.owner_name = buffer.unique_name
buffer = property(
fget=IUInterface._get_buffer,
fset=_set_buffer,
doc='Buffer this IU is held in.')
def _set_uid(self, uid):
if self._uid is not None:
raise AttributeError('The uid of IU ' + self.uid + ' has already been set, cannot change it.')
self._uid = uid
uid = property(
fget=IUInterface._get_uid,
fset=_set_uid,
doc='Unique ID of the IU.')
#}}}
class RemoteMessage(IUInterface):#{{{
"""A remote IU with access mode 'MESSAGE'."""
def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links):
super(RemoteMessage, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only)
self._revision = revision
self._category = category
self.owner_name = owner_name
self._payload_type = payload_type
self._committed = committed
self._retracted = False
# NOTE Since the payload is an already-existant Payload which we didn't modify ourselves,
# don't try to invoke any modification checks or network updates ourselves either.
# We are just receiving it here and applying the new data.
self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True)
self._links = links
def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
logger.info('Info: modifying a RemoteMessage only has local effects')
def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
logger.info('Info: modifying a RemoteMessage only has local effects')
def commit(self):
logger.info('Info: committing to a RemoteMessage only has local effects')
def _get_payload(self):
return self._payload
def _set_payload(self, new_pl):
logger.info('Info: modifying a RemoteMessage only has local effects')
self._payload = Payload(iu=self, new_payload=new_pl, omit_init_update_message=True)
payload = property(
fget=_get_payload,
fset=_set_payload,
doc='Payload dictionary of the IU.')
def _apply_link_update(self, update):
"""Apply a IULinkUpdate to the IU."""
logger.warning('Warning: should never be called: RemoteMessage._apply_link_update')
self._revision = update.revision
if update.is_delta:
self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove)
else:
self._replace_links(links=update.new_links)
def _apply_update(self, update):
"""Apply a IUPayloadUpdate to the IU."""
logger.warning('Warning: should never be called: RemoteMessage._apply_update')
self._revision = update.revision
if update.is_delta:
for k in update.keys_to_remove: self.payload._remotely_enforced_delitem(k)
for k, v in update.new_items.items(): self.payload._remotely_enforced_setitem(k, v)
else:
# NOTE Please read the comment in the constructor
self._payload = Payload(iu=self, new_payload=update.new_items, omit_init_update_message=True)
def _apply_commission(self):
"""Apply commission to the IU"""
logger.warning('Warning: should never be called: RemoteMessage._apply_commission')
self._committed = True
def _apply_retraction(self):
"""Apply retraction to the IU"""
logger.warning('Warning: should never be called: RemoteMessage._apply_retraction')
self._retracted = True
#}}}
class RemotePushIU(IUInterface):#{{{
"""A remote IU with access mode 'PUSH'."""
def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links):
super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only)
self._revision = revision
self._category = category
self.owner_name = owner_name
self._payload_type = payload_type
self._committed = committed
self._retracted = False
# NOTE Since the payload is an already-existant Payload which we didn't modify ourselves,
# don't try to invoke any modification checks or network updates ourselves either.
# We are just receiving it here and applying the new data.
self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True)
self._links = links
def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
"""Modify the links: add or remove item from this payload remotely and send update."""
if self.committed:
raise IUCommittedError(self)
if self.read_only:
raise IUReadOnlyError(self)
requested_update = IULinkUpdate(
uid=self.uid,
revision=self.revision,
is_delta=is_delta,
writer_name=self.buffer.unique_name,
new_links=new_links,
links_to_remove=links_to_remove)
remote_server = self.buffer._get_remote_server(self)
new_revision = remote_server.updateLinks(requested_update)
if new_revision == 0:
raise IUUpdateFailedError(self)
else:
self._revision = new_revision
def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
"""Modify the payload: add or remove item from this payload remotely and send update."""
if self.committed:
raise IUCommittedError(self)
if self.read_only:
raise IUReadOnlyError(self)
requested_update = IUPayloadUpdate(
uid=self.uid,
revision=self.revision,
is_delta=is_delta,
writer_name=self.buffer.unique_name,
new_items=new_items,
keys_to_remove=keys_to_remove)
remote_server = self.buffer._get_remote_server(self)
new_revision = remote_server.updatePayload(requested_update)
if new_revision == 0:
raise IUUpdateFailedError(self)
else:
self._revision = new_revision
def commit(self):
"""Commit to this IU."""
if self.read_only:
raise IUReadOnlyError(self)
if self._committed:
# ignore commit requests when already committed
return
else:
commission_request = ipaaca_pb2.IUCommission()
commission_request.uid = self.uid
commission_request.revision = self.revision
commission_request.writer_name = self.buffer.unique_name
remote_server = self.buffer._get_remote_server(self)
new_revision = remote_server.commit(commission_request)
if new_revision == 0:
raise IUUpdateFailedError(self)
else:
self._revision = new_revision
self._committed = True
def _get_payload(self):
return self._payload
def _set_payload(self, new_pl):
if self.committed:
raise IUCommittedError(self)
if self.read_only:
raise IUReadOnlyError(self)
requested_update = IUPayloadUpdate(
uid=self.uid,
revision=self.revision,
is_delta=False,
writer_name=self.buffer.unique_name,
new_items=new_pl,
keys_to_remove=[])
remote_server = self.buffer._get_remote_server(self)
new_revision = remote_server.updatePayload(requested_update)
if new_revision == 0:
raise IUUpdateFailedError(self)
else:
self._revision = new_revision
# NOTE Please read the comment in the constructor
self._payload = Payload(iu=self, new_payload=new_pl, omit_init_update_message=True)
payload = property(
fget=_get_payload,
fset=_set_payload,
doc='Payload dictionary of the IU.')
def _apply_link_update(self, update):
"""Apply a IULinkUpdate to the IU."""
self._revision = update.revision
if update.is_delta:
self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove)
else:
self._replace_links(links=update.new_links)
def _apply_update(self, update):
"""Apply a IUPayloadUpdate to the IU."""
self._revision = update.revision
if update.is_delta:
for k in update.keys_to_remove: self.payload._remotely_enforced_delitem(k)
for k, v in update.new_items.items(): self.payload._remotely_enforced_setitem(k, v)
else:
# NOTE Please read the comment in the constructor
self._payload = Payload(iu=self, new_payload=update.new_items, omit_init_update_message=True)
def _apply_commission(self):
"""Apply commission to the IU"""
self._committed = True
def _apply_retraction(self):
"""Apply retraction to the IU"""
self._retracted = True
#}}}
class IntConverter(rsb.converter.Converter):#{{{
"""Convert Python int objects to Protobuf ints and vice versa."""
def __init__(self, wireSchema="int", dataType=int):
super(IntConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, value):
pbo = ipaaca_pb2.IntMessage()
pbo.value = value
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
pbo = ipaaca_pb2.IntMessage()
pbo.ParseFromString( str(byte_stream) )
return pbo.value
#}}}
class IUConverter(rsb.converter.Converter):#{{{
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-iu", dataType=IU):
super(IUConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu):
pbo = ipaaca_pb2.IU()
pbo.uid = iu._uid
pbo.revision = iu._revision
pbo.category = iu._category
pbo.payload_type = iu._payload_type
pbo.owner_name = iu._owner_name
pbo.committed = iu._committed
am=ipaaca_pb2.IU.PUSH #default
if iu._access_mode == IUAccessMode.MESSAGE:
am=ipaaca_pb2.IU.MESSAGE
# TODO add other types later
pbo.access_mode = am
pbo.read_only = iu._read_only
for k,v in iu._payload.items():
entry = pbo.payload.add()
pack_typed_payload_item(entry, k, v)
for type_ in iu._links.keys():
linkset = pbo.links.add()
linkset.type = type_
linkset.targets.extend(iu._links[type_])
ws = "ipaaca-messageiu" if iu._access_mode == IUAccessMode.MESSAGE else self.wireSchema
return bytearray(pbo.SerializeToString()), ws
def deserialize(self, byte_stream, ws):
type = self.getDataType()
#print('IUConverter.deserialize got a '+str(type)+' over wireSchema '+ws)
if type == IU or type == Message:
pbo = ipaaca_pb2.IU()
pbo.ParseFromString( str(byte_stream) )
if pbo.access_mode == ipaaca_pb2.IU.PUSH:
_payload = {}
for entry in pbo.payload:
k, v = unpack_typed_payload_item(entry)
_payload[k] = v
_links = collections.defaultdict(set)
for linkset in pbo.links:
for target_uid in linkset.targets:
_links[linkset.type].add(target_uid)
remote_push_iu = RemotePushIU(
uid=pbo.uid,
revision=pbo.revision,
read_only = pbo.read_only,
owner_name = pbo.owner_name,
category = pbo.category,
payload_type = pbo.payload_type,
committed = pbo.committed,
payload=_payload,
links=_links
)
return remote_push_iu
elif pbo.access_mode == ipaaca_pb2.IU.MESSAGE:
_payload = {}
for entry in pbo.payload:
k, v = unpack_typed_payload_item(entry)
_payload[k] = v
_links = collections.defaultdict(set)
for linkset in pbo.links:
for target_uid in linkset.targets:
_links[linkset.type].add(target_uid)
remote_message = RemoteMessage(
uid=pbo.uid,
revision=pbo.revision,
read_only = pbo.read_only,
owner_name = pbo.owner_name,
category = pbo.category,
payload_type = pbo.payload_type,
committed = pbo.committed,
payload=_payload,
links=_links
)
return remote_message
else:
raise Exception("We can only handle IUs with access mode 'PUSH' or 'MESSAGE' for now!")
else:
raise ValueError("Inacceptable dataType %s" % type)
#}}}
class MessageConverter(rsb.converter.Converter):#{{{
'''
Converter class for Full IU representations
wire:bytearray <-> wire-schema:ipaaca-full-iu <-> class ipaacaRSB.IU
'''
def __init__(self, wireSchema="ipaaca-messageiu", dataType=Message):
super(IUConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu):
pbo = ipaaca_pb2.IU()
pbo.uid = iu._uid
pbo.revision = iu._revision
pbo.category = iu._category
pbo.payload_type = iu._payload_type
pbo.owner_name = iu._owner_name
pbo.committed = iu._committed
am=ipaaca_pb2.IU.PUSH #default
if iu._access_mode == IUAccessMode.MESSAGE:
am=ipaaca_pb2.IU.MESSAGE
# TODO add other types later
pbo.access_mode = am
pbo.read_only = iu._read_only
for k,v in iu._payload.items():
entry = pbo.payload.add()
pack_typed_payload_item(entry, k, v)
for type_ in iu._links.keys():
linkset = pbo.links.add()
linkset.type = type_
linkset.targets.extend(iu._links[type_])
ws = "ipaaca-messageiu" if iu._access_mode == IUAccessMode.MESSAGE else self.wireSchema
return bytearray(pbo.SerializeToString()), ws
def deserialize(self, byte_stream, ws):
type = self.getDataType()
#print('MessageConverter.deserialize got a '+str(type)+' over wireSchema '+ws)
if type == IU or type == Message:
pbo = ipaaca_pb2.IU()
pbo.ParseFromString( str(byte_stream) )
if pbo.access_mode == ipaaca_pb2.IU.PUSH:
_payload = {}
for entry in pbo.payload:
k, v = unpack_typed_payload_item(entry)
_payload[k] = v
_links = collections.defaultdict(set)
for linkset in pbo.links:
for target_uid in linkset.targets:
_links[linkset.type].add(target_uid)
remote_push_iu = RemotePushIU(
uid=pbo.uid,
revision=pbo.revision,
read_only = pbo.read_only,
owner_name = pbo.owner_name,
category = pbo.category,
payload_type = pbo.payload_type,
committed = pbo.committed,
payload=_payload,
links=_links
)
return remote_push_iu
elif pbo.access_mode == ipaaca_pb2.IU.MESSAGE:
_payload = {}
for entry in pbo.payload:
k, v = unpack_typed_payload_item(entry)
_payload[k] = v
_links = collections.defaultdict(set)
for linkset in pbo.links:
for target_uid in linkset.targets:
_links[linkset.type].add(target_uid)
remote_message = RemoteMessage(
uid=pbo.uid,
revision=pbo.revision,
read_only = pbo.read_only,
owner_name = pbo.owner_name,
category = pbo.category,
payload_type = pbo.payload_type,
committed = pbo.committed,
payload=_payload,
links=_links
)
return remote_message
else:
raise Exception("We can only handle IUs with access mode 'PUSH' or 'MESSAGE' for now!")
else:
raise ValueError("Inacceptable dataType %s" % type)
#}}}
class IULinkUpdate(object):#{{{
def __init__(self, uid, revision, is_delta, writer_name="undef", new_links=None, links_to_remove=None):
super(IULinkUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.writer_name = writer_name
self.is_delta = is_delta
self.new_links = collections.defaultdict(set) if new_links is None else collections.defaultdict(set, new_links)
self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove)
def __str__(self):
s = 'LinkUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_links = '+str(self.new_links)+', '
s += 'links_to_remove = '+str(self.links_to_remove)+')'
return s
#}}}
class IUPayloadUpdate(object):#{{{
def __init__(self, uid, revision, is_delta, writer_name="undef", new_items=None, keys_to_remove=None):
super(IUPayloadUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.writer_name = writer_name
self.is_delta = is_delta
self.new_items = {} if new_items is None else new_items
self.keys_to_remove = [] if keys_to_remove is None else keys_to_remove
def __str__(self):
s = 'PayloadUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_items = '+str(self.new_items)+', '
s += 'keys_to_remove = '+str(self.keys_to_remove)+')'
return s
#}}}
class IULinkUpdateConverter(rsb.converter.Converter):#{{{
def __init__(self, wireSchema="ipaaca-iu-link-update", dataType=IULinkUpdate):
super(IULinkUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu_link_update):
pbo = ipaaca_pb2.IULinkUpdate()
pbo.uid = iu_link_update.uid
pbo.writer_name = iu_link_update.writer_name
pbo.revision = iu_link_update.revision
for type_ in iu_link_update.new_links.keys():
linkset = pbo.new_links.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.new_links[type_])
for type_ in iu_link_update.links_to_remove.keys():
linkset = pbo.links_to_remove.add()
linkset.type = type_
linkset.targets.extend(iu_link_update.links_to_remove[type_])
pbo.is_delta = iu_link_update.is_delta
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
type = self.getDataType()
if type == IULinkUpdate:
pbo = ipaaca_pb2.IULinkUpdate()
pbo.ParseFromString( str(byte_stream) )
logger.debug('received an IULinkUpdate for revision '+str(pbo.revision))
iu_link_up = IULinkUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_links:
iu_link_up.new_links[str(entry.type)] = set(entry.targets)
for entry in pbo.links_to_remove:
iu_link_up.links_to_remove[str(entry.type)] = set(entry.targets)
return iu_link_up
else:
raise ValueError("Inacceptable dataType %s" % type)
#}}}
class IUPayloadUpdateConverter(rsb.converter.Converter):#{{{
def __init__(self, wireSchema="ipaaca-iu-payload-update", dataType=IUPayloadUpdate):
super(IUPayloadUpdateConverter, self).__init__(bytearray, dataType, wireSchema)
def serialize(self, iu_payload_update):
pbo = ipaaca_pb2.IUPayloadUpdate()
pbo.uid = iu_payload_update.uid
pbo.writer_name = iu_payload_update.writer_name
pbo.revision = iu_payload_update.revision
for k,v in iu_payload_update.new_items.items():
entry = pbo.new_items.add()
pack_typed_payload_item(entry, k, v)
pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove)
pbo.is_delta = iu_payload_update.is_delta
return bytearray(pbo.SerializeToString()), self.wireSchema
def deserialize(self, byte_stream, ws):
type = self.getDataType()
if type == IUPayloadUpdate:
pbo = ipaaca_pb2.IUPayloadUpdate()
pbo.ParseFromString( str(byte_stream) )
logger.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_items:
k, v = unpack_typed_payload_item(entry)
iu_up.new_items[k] = v
iu_up.keys_to_remove = pbo.keys_to_remove[:]
return iu_up
else:
raise ValueError("Inacceptable dataType %s" % type)
#}}}
class IUStore(dict):
"""A dictionary storing IUs."""
def __init__(self):
super(IUStore, self).__init__()
class FrozenIUStore(IUStore):
"""A read-only version of a dictionary storing IUs. (TODO: might be slow)"""
def __init__(self, original_iu_store):
super(FrozenIUStore, self).__init__()
map(lambda p: super(FrozenIUStore, self).__setitem__(p[0], p[1]), original_iu_store.items())
def __delitem__(self, k):
raise AttributeError()
def __setitem__(self, k, v):
raise AttributeError()
class IUEventHandler(object):
"""Wrapper for IU event handling functions."""
def __init__(self, handler_function, for_event_types=None, for_categories=None):
"""Create an IUEventHandler.
Keyword arguments:
handler_function -- the handler function with the signature
(IU, event_type, local)
for_event_types -- a list of event types or None if handler should
be called for all event types
for_categories -- a list of category names or None if handler should
be called for all categoires
"""
super(IUEventHandler, self).__init__()
self._handler_function = handler_function
self._for_event_types = (
None if for_event_types is None else
(for_event_types[:] if hasattr(for_event_types, '__iter__') else [for_event_types]))
self._for_categories = (
None if for_categories is None else
(for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories]))
def condition_met(self, event_type, category):
"""Check whether this IUEventHandler should be called.
Keyword arguments:
event_type -- type of the IU event
category -- category of the IU which triggered the event
"""
type_condition_met = (self._for_event_types is None or event_type in self._for_event_types)
cat_condition_met = (self._for_categories is None or category in self._for_categories)
return type_condition_met and cat_condition_met
def call(self, buffer, iu_uid, local, event_type, category):
"""Call this IUEventHandler's function, if it applies.
Keyword arguments:
buffer -- the buffer in which the IU is stored
iu_uid -- the uid of the IU
local -- is the IU local or remote to this component? @RAMIN: Is this correct?
event_type -- IU event type
category -- category of the IU
"""
if self.condition_met(event_type, category):
iu = buffer._iu_store[iu_uid]
self._handler_function(iu, event_type, local)
class Buffer(object):
"""Base class for InputBuffer and OutputBuffer."""
def __init__(self, owning_component_name, participant_config=None):
'''Create a Buffer.
Keyword arguments:
owning_compontent_name -- name of the entity that owns this Buffer
participant_config -- RSB configuration
'''
super(Buffer, self).__init__()
self._owning_component_name = owning_component_name
self._participant_config = rsb.ParticipantConfig.fromDefaultSources() if participant_config is None else participant_config
self._uuid = str(uuid.uuid4())[0:8]
# Initialise with a temporary, but already unique, name
self._unique_name = "undef-"+self._uuid
self._iu_store = IUStore()
self._iu_event_handlers = []
def _get_frozen_iu_store(self):
return FrozenIUStore(original_iu_store = self._iu_store)
iu_store = property(fget=_get_frozen_iu_store, doc='Copy-on-read version of the internal IU store')
def register_handler(self, handler_function, for_event_types=None, for_categories=None):
"""Register a new IU event handler function.
Keyword arguments:
handler_function -- a function with the signature (IU, event_type, local)
for_event_types -- a list of event types or None if handler should
be called for all event types
for_categories -- a list of category names or None if handler should
be called for all categories
"""
handler = IUEventHandler(handler_function=handler_function, for_event_types=for_event_types, for_categories=for_categories)
self._iu_event_handlers.append(handler)
def call_iu_event_handlers(self, uid, local, event_type, category):
"""Call registered IU event handler functions registered for this event_type and category."""
for h in self._iu_event_handlers:
h.call(self, uid, local=local, event_type=event_type, category=category)
def _get_owning_component_name(self):
"""Return the name of this Buffer's owning component"""
return self._owning_component_name
owning_component_name = property(_get_owning_component_name)
def _get_unique_name(self):
"""Return the Buffer's unique name."""
return self._unique_name
unique_name = property(_get_unique_name)
class InputBuffer(Buffer):
"""An InputBuffer that holds remote IUs."""
def __init__(self, owning_component_name, category_interests=None, participant_config=None):
'''Create an InputBuffer.
Keyword arguments:
owning_compontent_name -- name of the entity that owns this InputBuffer
category_interests -- list of IU categories this Buffer is interested in
participant_config = RSB configuration
'''
super(InputBuffer, self).__init__(owning_component_name, participant_config)
self._unique_name = '/ipaaca/component/'+str(owning_component_name)+'ID'+self._uuid+'/IB'
self._listener_store = {} # one per IU category
self._remote_server_store = {} # one per remote-IU-owning Component
self._category_interests = []
if category_interests is not None:
for cat in category_interests:
self._add_category_listener(cat)
def _get_remote_server(self, iu):
'''Return (or create, store and return) a remote server.'''
if iu.owner_name in self._remote_server_store:
return self._remote_server_store[iu.owner_name]
# TODO remove the str() when unicode is supported (issue #490)
remote_server = rsb.createRemoteServer(rsb.Scope(str(iu.owner_name)))
self._remote_server_store[iu.owner_name] = remote_server
return remote_server
def _add_category_listener(self, iu_category):
'''Return (or create, store and return) a category listener.'''
if iu_category not in self._listener_store:
cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
cat_listener.addHandler(self._handle_iu_events)
self._listener_store[iu_category] = cat_listener
self._category_interests.append(iu_category)
logger.info("Added listener in scope "+"/ipaaca/category/"+iu_category)
def _handle_iu_events(self, event):
'''Dispatch incoming IU events.
Adds incoming IU's to the store, applies payload and commit updates to
IU, calls IU event handlers.'
Keyword arguments:
event -- a converted RSB event
'''
type_ = type(event.data)
if type_ is RemotePushIU:
# a new IU
if event.data.uid in self._iu_store:
# already in our store
pass
else:
self._iu_store[ event.data.uid ] = event.data
event.data.buffer = self
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.ADDED, category=event.data.category)
elif type_ is RemoteMessage:
# a new Message, an ephemeral IU that is removed after calling handlers
self._iu_store[ event.data.uid ] = event.data
event.data.buffer = self
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.MESSAGE, category=event.data.category)
del self._iu_store[ event.data.uid ]
else:
# an update to an existing IU
if event.data.uid not in self._iu_store:
# TODO: we should request the IU's owner to send us the IU
logger.warning("Update message for IU which we did not fully receive before.")
return
if type_ is ipaaca_pb2.IURetraction:
# IU retraction (cannot be triggered remotely)
iu = self._iu_store[event.data.uid]
iu._revision = event.data.revision
iu._apply_retraction() # for now - just sets the _rectracted flag.
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.RETRACTED, category=iu.category)
# SPECIAL CASE: allow the handlers (which will need to find the IU
# in the buffer) to operate on the IU - then delete it afterwards!
# FIXME: for now: retracted == deleted! Think about this later
del(self._iu_store[iu.uid])
else:
if event.data.writer_name == self.unique_name:
# Notify only for remotely triggered events;
# Discard updates that originate from this buffer
return
#else:
# print('Got update written by buffer '+str(event.data.writer_name))
if type_ is ipaaca_pb2.IUCommission:
# IU commit
iu = self._iu_store[event.data.uid]
iu._apply_commission()
iu._revision = event.data.revision
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category)
elif type_ is IUPayloadUpdate:
# IU payload update
iu = self._iu_store[event.data.uid]
iu._apply_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category)
elif type_ is IULinkUpdate:
# IU link update
iu = self._iu_store[event.data.uid]
iu._apply_link_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.LINKSUPDATED, category=iu.category)
else:
logger.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_))
def add_category_interests(self, category_interests):
for interest in category_interests:
self._add_category_listener(interest)
class OutputBuffer(Buffer):
"""An OutputBuffer that holds local IUs."""
def __init__(self, owning_component_name, participant_config=None):
'''Create an Output Buffer.
Keyword arguments:
owning_component_name -- name of the entity that own this buffer
participant_config -- RSB configuration
'''
super(OutputBuffer, self).__init__(owning_component_name, participant_config)
self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB'
self._server = rsb.createServer(rsb.Scope(self._unique_name))
self._server.addMethod('updateLinks', self._remote_update_links, IULinkUpdate, int)
self._server.addMethod('updatePayload', self._remote_update_payload, IUPayloadUpdate, int)
self._server.addMethod('commit', self._remote_commit, ipaaca_pb2.IUCommission, int)
self._informer_store = {}
self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-'
self.__iu_id_counter_lock = threading.Lock()
#self.__iu_id_counter = 0 # hbuschme: IUs now have their Ids assigned on creation
def _create_own_name_listener(self, iu_category):
# FIXME replace this
'''Create an own name listener.'''
#if iu_category in self._listener_store: return self._informer_store[iu_category]
#cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
#cat_listener.addHandler(self._handle_iu_events)
#self._listener_store[iu_category] = cat_listener
#self._category_interests.append(iu_category)
#logger.info("Added category listener for "+iu_category)
#return cat_listener
pass
# hbuschme: IUs now have their Ids assigned on creation
#def _generate_iu_uid(self):
# '''Generate a unique IU id of the form ????'''
# with self.__iu_id_counter_lock:
# self.__iu_id_counter += 1
# number = self.__iu_id_counter
# return self._id_prefix + str(number)
def _remote_update_links(self, update):
'''Apply a remotely requested update to one of the stored IU's links.'''
if update.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
return 0
iu = self._iu_store[update.uid]
with iu.revision_lock:
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
return 0
if update.is_delta:
iu.modify_links(add=update.new_links, remove=update.links_to_remove, writer_name=update.writer_name)
else:
iu.set_links(links=update.new_links, writer_name=update.writer_name)
self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.LINKSUPDATED, category=iu.category)
return iu.revision
def _remote_update_payload(self, update):
'''Apply a remotely requested update to one of the stored IU's payload.'''
if update.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
return 0
iu = self._iu_store[update.uid]
with iu.revision_lock:
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning(u"Remote update_payload operation failed because request was out of date; IU "+str(update.uid))
logger.warning(u" Writer was: "+update.writer_name)
logger.warning(u" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove))
logger.warning(u" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision))
return 0
if update.is_delta:
#print('Writing delta update by '+str(update.writer_name))
with iu.payload:
for k in update.keys_to_remove:
iu.payload.__delitem__(k, writer_name=update.writer_name)
for k,v in update.new_items.items():
iu.payload.__setitem__(k, v, writer_name=update.writer_name)
else:
#print('Writing non-incr update by '+str(update.writer_name))
iu._set_payload(update.new_items, writer_name=update.writer_name)
# _set_payload etc. have also incremented the revision number
self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category)
return iu.revision
def _remote_commit(self, iu_commission):
'''Apply a remotely requested commit to one of the stored IUs.'''
if iu_commission.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid))
return 0
iu = self._iu_store[iu_commission.uid]
with iu.revision_lock:
if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
return 0
if iu.committed:
return 0
else:
iu._internal_commit(writer_name=iu_commission.writer_name)
self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=IUEventType.COMMITTED, category=iu.category)
return iu.revision
def _get_informer(self, iu_category):
'''Return (or create, store and return) an informer object for IUs of the specified category.'''
if iu_category in self._informer_store:
logger.info("Returning informer on scope "+"/ipaaca/category/"+str(iu_category))
return self._informer_store[iu_category]
informer_iu = rsb.createInformer(
rsb.Scope("/ipaaca/category/"+str(iu_category)),
config=self._participant_config,
dataType=object)
self._informer_store[iu_category] = informer_iu #new_tuple
logger.info("Returning NEW informer on scope "+"/ipaaca/category/"+str(iu_category))
return informer_iu #return new_tuple
def add(self, iu):
'''Add an IU to the IU store, assign an ID and publish it.'''
# hbuschme: IUs now have their Ids assigned on creation
#if iu._uid is not None:
# raise IUPublishedError(iu)
#iu.uid = self._generate_iu_uid()
if iu.uid in self._iu_store:
raise IUPublishedError(iu)
if iu.buffer is not None:
raise IUPublishedError(iu)
if iu.access_mode != IUAccessMode.MESSAGE:
# Messages are not really stored in the OutputBuffer
self._iu_store[iu.uid] = iu
iu.buffer = self
self._publish_iu(iu)
def remove(self, iu=None, iu_uid=None):
'''Remove the iu or an IU corresponding to iu_uid from the OutputBuffer, retracting it from the system.'''
if iu is None:
if iu_uid is None:
return None
else:
if iu_uid not in self. _iu_store:
raise IUNotFoundError(iu_uid)
iu = self._iu_store[iu_uid]
# unpublish the IU
self._retract_iu(iu)
del self._iu_store[iu.uid]
return iu
def _publish_iu(self, iu):
'''Publish an IU.'''
informer = self._get_informer(iu._category)
informer.publishData(iu)
def _retract_iu(self, iu):
'''Retract (unpublish) an IU.'''
iu_retraction = ipaaca_pb2.IURetraction()
iu_retraction.uid = iu.uid
iu_retraction.revision = iu.revision
informer = self._get_informer(iu._category)
informer.publishData(iu_retraction)
def _send_iu_commission(self, iu, writer_name):
'''Send IU commission.
Keyword arguments:
iu -- the IU that has been committed to
writer_name -- name of the Buffer that initiated this commit, necessary
to enable remote components to filter out updates that originated
from their own operations
'''
# a raw Protobuf object for IUCommission is produced
# (unlike updates, where we have an intermediate class)
iu_commission = ipaaca_pb2.IUCommission()
iu_commission.uid = iu.uid
iu_commission.revision = iu.revision
iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name
informer = self._get_informer(iu._category)
informer.publishData(iu_commission)
def _send_iu_link_update(self, iu, is_delta, revision, new_links=None, links_to_remove=None, writer_name="undef"):
'''Send an IU link update.
Keyword arguments:
iu -- the IU being updated
is_delta -- whether this is an incremental update or a replacement
the whole link dictionary
revision -- the new revision number
new_links -- a dictionary of new link sets
links_to_remove -- a dict of the link sets that shall be removed
writer_name -- name of the Buffer that initiated this update, necessary
to enable remote components to filter out updates that originate d
from their own operations
'''
if new_links is None:
new_links = {}
if links_to_remove is None:
links_to_remove = {}
link_update = IULinkUpdate(iu._uid, is_delta=is_delta, revision=revision)
link_update.new_links = new_links
if is_delta:
link_update.links_to_remove = links_to_remove
link_update.writer_name = writer_name
informer = self._get_informer(iu._category)
informer.publishData(link_update)
# FIXME send the notification to the target, if the target is not the writer_name
def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"):
'''Send an IU payload update.
Keyword arguments:
iu -- the IU being updated
is_delta -- whether this is an incremental update or a replacement
revision -- the new revision number
new_items -- a dictionary of new payload items
keys_to_remove -- a list of the keys that shall be removed from the
payload
writer_name -- name of the Buffer that initiated this update, necessary
to enable remote components to filter out updates that originate d
from their own operations
'''
if new_items is None:
new_items = {}
if keys_to_remove is None:
keys_to_remove = []
payload_update = IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision)
payload_update.new_items = new_items
if is_delta:
payload_update.keys_to_remove = keys_to_remove
payload_update.writer_name = writer_name
informer = self._get_informer(iu._category)
informer.publishData(payload_update)
#print(" -- Sent update with writer name "+str(writer_name))
## --- RSB -------------------------------------------------------------------
def initialize_ipaaca_rsb():#{{{
rsb.converter.registerGlobalConverter(
IntConverter(wireSchema="int32", dataType=int))
rsb.converter.registerGlobalConverter(
IUConverter(wireSchema="ipaaca-iu", dataType=IU))
rsb.converter.registerGlobalConverter(
IUConverter(wireSchema="ipaaca-messageiu", dataType=Message))
rsb.converter.registerGlobalConverter(
IULinkUpdateConverter(
wireSchema="ipaaca-iu-link-update",
dataType=IULinkUpdate))
rsb.converter.registerGlobalConverter(
IUPayloadUpdateConverter(
wireSchema="ipaaca-iu-payload-update",
dataType=IUPayloadUpdate))
rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca_pb2.IUCommission))
rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca_pb2.IURetraction))
rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()
#t = rsb.ParticipantConfig.Transport('spread', {'enabled':'true'})
#rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromFile('rsb.cfg')
#}}}
## --- Module initialisation -------------------------------------------------
# register our own RSB Converters
initialize_ipaaca_rsb()
# Create a global logger for this module
logger = logging.getLogger('ipaaca')
logger.addHandler(IpaacaLoggingHandler(level=logging.INFO))
#
# ipaaca.exit(int_retval)
#
from ipaaca.buffer import atexit_cleanup_function
def exit(int_retval=0):
'''For the time being, this function can be used to
circumvent any sys.exit blocks, while at the same time
cleaning up the buffers (e.g. retracting IUs).
Call once at the end of any python script (or anywhere
in lieu of sys.exit() / os._exit(). '''
print('ipaaca: cleaning up and exiting with code '+str(int_retval))
atexit_cleanup_function()
os._exit(int_retval)
__RSB_INITIALIZER_LOCK = threading.Lock()
__RSB_INITIALIZED = False
def initialize_ipaaca_rsb_if_needed():
"""Initialise rsb if not yet initialise.
* Register own RSB converters.
* Initialise RSB from enviroment variables, rsb config file, or
from default values for RSB trnasport, host, and port (via
ipaaca.defaults or ipaaca.misc.IpaacaArgumentParser).
"""
global __RSB_INITIALIZED
with __RSB_INITIALIZER_LOCK:
if __RSB_INITIALIZED:
return
else:
ipaaca.converter.register_global_converter(
ipaaca.converter.IUConverter(
wireSchema="ipaaca-iu",
dataType=IU))
ipaaca.converter.register_global_converter(
ipaaca.converter.MessageConverter(
wireSchema="ipaaca-messageiu",
dataType=Message))
ipaaca.converter.register_global_converter(
ipaaca.converter.IULinkUpdateConverter(
wireSchema="ipaaca-iu-link-update",
dataType=converter.IULinkUpdate))
ipaaca.converter.register_global_converter(
ipaaca.converter.IUPayloadUpdateConverter(
wireSchema="ipaaca-iu-payload-update",
dataType=converter.IUPayloadUpdate))
if ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT is not None:
if ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT == 'spread':
os.environ['RSB_TRANSPORT_SPREAD_ENABLED'] = str(1)
os.environ['RSB_TRANSPORT_SOCKET_ENABLED'] = str(0)
elif ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT == 'socket':
os.environ['RSB_TRANSPORT_SPREAD_ENABLED'] = str(0)
os.environ['RSB_TRANSPORT_SOCKET_ENABLED'] = str(1)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_SOCKET_SERVER is not None:
os.environ['RSB_TRANSPORT_SOCKET_SERVER'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_SOCKET_SERVER)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST is not None:
os.environ['RSB_TRANSPORT_SPREAD_HOST'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST)
os.environ['RSB_TRANSPORT_SOCKET_HOST'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT is not None:
os.environ['RSB_TRANSPORT_SPREAD_PORT'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT)
os.environ['RSB_TRANSPORT_SOCKET_PORT'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT)
#
ipaaca.backend.register_backends()
__RSB_INITIALIZED = True
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import ipaaca.converter
import threading
import uuid
import os
import time
LOGGER = ipaaca.misc.get_library_logger()
__registered_backends = {}
__backend_registration_done = False
def register_backends():
global __registered_backends
global __backend_registration_done
if not __backend_registration_done:
__backend_registration_done = True
LOGGER.debug('Registering available back-ends')
# register available backends
# mqtt
import ipaaca.backend_mqtt
be = ipaaca.backend_mqtt.create_backend()
if be is not None:
__registered_backends[be.name] = be
LOGGER.debug('Back-end '+str(be.name)+' added')
# ros
import ipaaca.backend_ros
be = ipaaca.backend_ros.create_backend()
if be is not None:
__registered_backends[be.name] = be
LOGGER.debug('Back-end '+str(be.name)+' added')
def get_default_backend():
# TODO selection mechanism / config
if not __backend_registration_done:
register_backends()
if len(__registered_backends) == 0:
raise RuntimeError('No back-ends could be initialized for ipaaca-python')
cfg = ipaaca.config.get_global_config()
preferred = cfg.get_with_default('backend', None)
if preferred is None:
k = list(__registered_backends.keys())[0]
if len(__registered_backends) > 1:
LOGGER.warning('No preferred ipaaca.backend set, returning one of several (probably the first in list)')
print('Using randomly selected back-end {}!'.format(k))
else:
if preferred in __registered_backends:
k = preferred
else:
raise ipaaca.exception.BackendInitializationError(preferred)
LOGGER.info('Back-end is '+str(k))
return __registered_backends[k]
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import collections
import ipaaca.ipaaca_pb2
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import ipaaca.converter
import ipaaca.backend
import ipaaca.config
import threading
try:
import queue
except:
import Queue as queue
import uuid
import os
import time
try:
import paho.mqtt.client as mqtt
MQTT_ENABLED = True
except:
MQTT_ENABLED = False
if not MQTT_ENABLED:
def create_backend():
return None
else:
def create_backend():
return MQTTBackend(name='mqtt')
LOGGER = ipaaca.misc.get_library_logger()
_REMOTE_SERVER_MAX_QUEUED_REQUESTS = -1 # unlimited
_REMOTE_LISTENER_MAX_QUEUED_EVENTS = 1024 # 'Full' exception if exceeded
class EventWrapper(object):
def __init__(self, data):
self.data = data
class PendingRequest(object):
'''Encapsulation of a pending remote request with
a facility to keep the requesting thread locked
until the reply or a timeout unlocks it.'''
def __init__(self, request):
self._request = request
self._event = threading.Event()
self._reply = None
self._request_uid = str(uuid.uuid4())[0:8]
def wait_for_reply(self, timeout=30.0):
wr = self._event.wait(timeout)
return None if wr is False else self._reply
def reply_with_result(self, reply):
self._reply = reply
self._event.set()
class Informer(object):
'''Informer interface, wrapping an outbound port to MQTT'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
self._mqtt_client.connect(self._host, self._port)
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._live = True
self._live_event.set()
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
pass
def publishData(self, data):
#print('Informer publishing '+str(data.__class__.__name__)+' on '+self._scope)
self._mqtt_client.publish(self._scope, ipaaca.converter.serialize(data), qos=2)
class BackgroundEventDispatcher(threading.Thread):
def __init__(self, listener):
super(BackgroundEventDispatcher, self).__init__()
self.daemon = True
self._listener = listener
def terminate(self):
self._running = False
def run(self):
self._running = True
listener = self._listener
while self._running: # auto-terminated (daemon)
event = listener._event_queue.get(block=True, timeout=None)
if event is None: return # signaled termination
#print('\033[31mDispatch '+str(event.data.__class__.__name__)+' start ...\033[m')
for handler in self._listener._handlers:
handler(event)
#print('\033[32m... dispatch '+str(event.data.__class__.__name__)+' end.\033[m')
class Listener(object):
'''Listener interface, wrapping an inbound port from MQTT'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
self._event_queue = queue.Queue(_REMOTE_LISTENER_MAX_QUEUED_EVENTS)
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_socket_open = self.mqtt_callback_on_socket_open
#self._mqtt_client.on_socket_close = self.mqtt_callback_on_socket_close
#self._mqtt_client.on_log = self.mqtt_callback_on_log
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self._dispatcher = BackgroundEventDispatcher(self)
self._dispatcher.start()
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._event_queue.put(None, block=False) # signal termination, waking queue
self._dispatcher.terminate()
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
LOGGER.debug('Connect to '+str(self._host)+':'+str(self._port))
self._mqtt_client.connect(self._host, self._port)
#def mqtt_callback_on_log(self, client, userdata, level, buf):
# print('Listener: LOG: '+str(buf))
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._mqtt_client.subscribe(self._scope, qos=2)
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
self._live = True
self._live_event.set()
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
event = EventWrapper(ipaaca.converter.deserialize(message.payload))
self._event_queue.put(event, block=False) # queue event for BackgroundEventDispatcher
def addHandler(self, handler):
self._handlers.append(handler)
#def publishData(self, data):
# self._mqtt_client.publish(self._
class LocalServer(object):
'''LocalServer interface, allowing for RPC requests to
IU functions, or reporting back success or failure.'''
def __init__(self, buffer_impl, scope, config=None):
self._buffer = buffer_impl
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_LocalServer_' + self._uuid # unused atm
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
self._mqtt_client.connect(self._host, self._port)
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._mqtt_client.subscribe(self._scope, qos=2)
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
self._live = True
self._live_event.set()
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
req = ipaaca.converter.deserialize(message.payload)
result = None
if isinstance(req, ipaaca.converter.IUPayloadUpdate):
result = self.attempt_to_apply_remote_updatePayload(req)
elif isinstance(req, ipaaca.converter.IULinkUpdate):
result = self.attempt_to_apply_remote_updateLinks(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUCommission):
result = self.attempt_to_apply_remote_commit(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUResendRequest):
result = self.attempt_to_apply_remote_resendRequest(req)
else:
raise RuntimeError('LocalServer: got an object of wrong class '+str(req.__class__.__name__)) # TODO replace
if result is not None:
self.send_result_for_request(req, result)
#
def send_result_for_request(self, obj, result):
pbo = ipaaca.ipaaca_pb2.RemoteRequestResult()
pbo.result = result
pbo.request_uid = obj.request_uid
#print('Sending result to endpoint '+str(obj.request_endpoint))
self._mqtt_client.publish(obj.request_endpoint, ipaaca.converter.serialize(pbo), qos=2)
def attempt_to_apply_remote_updateLinks(self, obj):
return self._buffer._remote_update_links(obj)
def attempt_to_apply_remote_updatePayload(self, obj):
return self._buffer._remote_update_payload(obj)
def attempt_to_apply_remote_commit(self, obj):
return self._buffer._remote_commit(obj)
def attempt_to_apply_remote_resendRequest(self, obj):
return self._buffer._remote_request_resend(obj)
class RemoteServer(object):
'''RemoteServer, connects to a LocalServer on the side
of an actual IU owner, which will process any requests.
The RemoteServer is put on hold while the owner is
processing. RemoteServer is from RSB terminology,
it might more aptly be described as an RPC client.'''
def __init__(self, remote_end_scope, config=None):
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
#queue.Queue(_REMOTE_SERVER_MAX_QUEUED_REQUESTS)
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_RemoteServer_' + self._uuid
# will RECV here:
self._scope = '/ipaaca/remotes/' + self._name
# will SEND here
self._remote_end_scope = remote_end_scope
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + remote_end_scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
self._mqtt_client.connect(self._host, self._port)
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._mqtt_client.subscribe(self._scope, qos=2)
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
self._live = True
self._live_event.set()
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
reply = ipaaca.converter.deserialize(message.payload)
if isinstance(reply, ipaaca.ipaaca_pb2.RemoteRequestResult):
uid = reply.request_uid
pending_request = None
with self._pending_requests_lock:
if uid in self._pending_requests:
pending_request = self._pending_requests[uid]
del self._pending_requests[uid]
if pending_request is None:
raise RuntimeError('RemoteServer: got a reply for request uid that is not queued: '+str(uid))
else:
# provide result to other thread and unblock it
pending_request.reply_with_result(reply)
else:
raise RuntimeError('RemoteServer: got an object of wrong class '+str(reply.__class__.__name__)) # TODO replace
def queue_pending_request(self, request):
pending_request = PendingRequest(request)
with self._pending_requests_lock:
if _REMOTE_SERVER_MAX_QUEUED_REQUESTS>0 and len(self._pending_requests) >= _REMOTE_SERVER_MAX_QUEUED_REQUESTS:
raise RuntimeError('RemoteServer: maximum number of pending requests exceeded') # TODO replace?
else:
self._pending_requests[pending_request._request_uid] = pending_request
return pending_request
# impl
def blocking_call(self, request):
# Broker's queue will raise before sending anything if capacity is exceeded
pending_request = self.queue_pending_request(request)
# complete and send request
request.request_uid = pending_request._request_uid
request.request_endpoint = self._scope
self._mqtt_client.publish(self._remote_end_scope, ipaaca.converter.serialize(request), qos=2)
# wait for other end to return result
reply = pending_request.wait_for_reply()
if reply is None:
LOGGER.warning('A request timed out!')
return 0
else:
return reply.result # the actual int result
# glue that quacks like the RSB version
def resendRequest(self, req):
return self.blocking_call(req)
def commit(self, req):
return self.blocking_call(req)
def updatePayload(self, req):
return self.blocking_call(req)
def updateLinks(self, req):
return self.blocking_call(req)
class MQTTBackend(object):
def __init__(self, name='mqtt'):
# back-end initialization code
self._config = ipaaca.config.get_global_config()
self._name = name
self._participants = set()
def _get_name(self):
return self._name
name = property(_get_name)
def teardown(self):
LOGGER.info('MQTT teardown: waiting 1 sec for final deliveries')
time.sleep(1)
for p in self._participants:
p.deactivate_internal()
def Scope(self, scope_str):
'''Scope adapter (glue replacing rsb.Scope)'''
return str(scope_str)
def createLocalServer(self, buffer_impl, scope, config=None):
LOGGER.debug('Creating a LocalServer on '+str(scope))
s = LocalServer(buffer_impl, scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
def createRemoteServer(self, scope, config=None):
LOGGER.debug('Creating a RemoteServer on '+str(scope))
s = RemoteServer(scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
def createInformer(self, scope, config=None, dataType="ignored in this backend"):
LOGGER.debug('Creating an Informer on '+str(scope))
s = Informer(scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
def createListener(self, scope, config=None):
LOGGER.debug('Creating a Listener on '+str(scope))
s = Listener(scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s