From 922f3458e9828405a3b96e2c49ca48f58de41bed Mon Sep 17 00:00:00 2001 From: Ramin Yaghoubzadeh <ryaghoubzadeh@uni-bielefeld.de> Date: Fri, 13 Nov 2015 14:20:37 +0100 Subject: [PATCH] First attempt to merge accumulated changes from legacy master branch Manually integrated the following changes from b8466a (master 2015-11-13) - Blackboard classes by hvanwelbergen - Blackboard test by hvanwelbergen - Payload merge fix by hvanwelbergen - rsb Buffer fix (by myself) copied but inactive (not needed atm?) Please refer to the logs of the legacy-str branch for a full history of the respective changes. --- ipaacalib/java/build.properties | 4 +- ipaacalib/java/src/ipaaca/IUConverter.java | 6 + ipaacalib/java/src/ipaaca/Payload.java | 2 +- .../java/src/ipaaca/util/Blackboard.java | 129 ++++++++++++++++++ .../src/ipaaca/util/BlackboardClient.java | 129 ++++++++++++++++++ .../ipaaca/util/BlackboardUpdateListener.java | 6 + .../java/test/src/ipaaca/JavaPythonTest.java | 2 +- .../util/BlackboardIntegrationTest.java | 129 ++++++++++++++++++ .../ComponentNotifierIntegrationTest.java | 31 ++++- ipaacalib/python/src/ipaaca/buffer.py | 2 + ipaacatools/scripts/ipaaca-iu-injector | 6 +- 11 files changed, 435 insertions(+), 11 deletions(-) create mode 100644 ipaacalib/java/src/ipaaca/util/Blackboard.java create mode 100644 ipaacalib/java/src/ipaaca/util/BlackboardClient.java create mode 100644 ipaacalib/java/src/ipaaca/util/BlackboardUpdateListener.java create mode 100644 ipaacalib/java/test/src/ipaaca/util/BlackboardIntegrationTest.java diff --git a/ipaacalib/java/build.properties b/ipaacalib/java/build.properties index be895bc..9549960 100644 --- a/ipaacalib/java/build.properties +++ b/ipaacalib/java/build.properties @@ -6,5 +6,5 @@ run.jvmargs= -Xms128m -Xmx512m -Xss5M rebuild.list= publish.resolver=asap.sftp.publish dist.dir=../../dist -javac.source=1.6 -javac.target=1.6 +#javac.source=1.6 +#javac.target=1.6 diff --git a/ipaacalib/java/src/ipaaca/IUConverter.java b/ipaacalib/java/src/ipaaca/IUConverter.java index d03660f..19f86de 100644 --- a/ipaacalib/java/src/ipaaca/IUConverter.java +++ b/ipaacalib/java/src/ipaaca/IUConverter.java @@ -108,6 +108,12 @@ public class IUConverter implements Converter<ByteBuffer> try { iu = IU.newBuilder().mergeFrom(buffer.array()).build(); + // If there are rsb buffer read-only issues in some build, use this code instead of the above line: + //int size = buffer.capacity(); + //byte[] array = new byte[size]; + //buffer.get(array, 0, size); + //iu = IU.newBuilder().mergeFrom(array).build(); + } catch (InvalidProtocolBufferException e) { diff --git a/ipaacalib/java/src/ipaaca/Payload.java b/ipaacalib/java/src/ipaaca/Payload.java index f2d0833..5d1ed1d 100644 --- a/ipaacalib/java/src/ipaaca/Payload.java +++ b/ipaacalib/java/src/ipaaca/Payload.java @@ -198,7 +198,7 @@ public class Payload implements Map<String, String> public void putAll(Map<? extends String, ? extends String> newItems) { - putAll(newItems); + putAll(newItems, null); } public void putAll(Map<? extends String, ? extends String> newItems, String writer) diff --git a/ipaacalib/java/src/ipaaca/util/Blackboard.java b/ipaacalib/java/src/ipaaca/util/Blackboard.java new file mode 100644 index 0000000..789182b --- /dev/null +++ b/ipaacalib/java/src/ipaaca/util/Blackboard.java @@ -0,0 +1,129 @@ +package ipaaca.util; + +import ipaaca.AbstractIU; +import ipaaca.HandlerFunctor; +import ipaaca.IUEventType; +import ipaaca.InputBuffer; +import ipaaca.LocalIU; +import ipaaca.OutputBuffer; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; + +/** + * A simple key-value blackboard + * @author hvanwelbergen + */ +public class Blackboard +{ + private final OutputBuffer outBuffer; + private final InputBuffer inBuffer; + private final LocalIU iu; + private final ComponentNotifier notifier; + private static final String DUMMY_KEY = "DUMMY_KEY"; + public static final String MESSAGE_SUFFIX = "MESSAGE"; + private int dummyValue = 0; + private List<BlackboardUpdateListener> listeners = Collections.synchronizedList(new ArrayList<BlackboardUpdateListener>()); + + public Blackboard(String id, String category) + { + this(id, category, "default"); + } + + private void updateListeners() + { + synchronized (listeners) + { + for (BlackboardUpdateListener listener : listeners) + { + listener.update(); + } + } + } + public Blackboard(String id, String category, String channel) + { + outBuffer = new OutputBuffer(id, channel); + iu = new LocalIU(category); + outBuffer.add(iu); + outBuffer.registerHandler(new HandlerFunctor() + { + @Override + public void handle(AbstractIU iu, IUEventType type, boolean local) + { + updateListeners(); + } + }); + inBuffer = new InputBuffer(id, ImmutableSet.of(ComponentNotifier.NOTIFY_CATEGORY, category + MESSAGE_SUFFIX), channel); + notifier = new ComponentNotifier(id, category, ImmutableSet.of(category), new HashSet<String>(), outBuffer, inBuffer); + notifier.addNotificationHandler(new HandlerFunctor() + { + @Override + public void handle(AbstractIU iuNotify, IUEventType type, boolean local) + { + dummyValue++; + iu.getPayload().put(DUMMY_KEY, "" + dummyValue); + } + }); + notifier.initialize(); + inBuffer.registerHandler(new HandlerFunctor() + { + @Override + public void handle(AbstractIU iuMessage, IUEventType type, boolean local) + { + iu.getPayload().putAll(iuMessage.getPayload()); + updateListeners(); + } + }, ImmutableSet.of(category + MESSAGE_SUFFIX)); + } + + public String put(String key, String value) + { + return iu.getPayload().put(key, value); + } + + public void putAll(Map<String, String> newItems) + { + iu.getPayload().putAll(newItems); + } + + /** + * Get the value corresponding to the key, or null if it is not available + */ + public String get(String key) + { + return iu.getPayload().get(key); + } + + public void addUpdateListener(BlackboardUpdateListener listener) + { + listeners.add(listener); + } + + public Set<String> keySet() + { + return iu.getPayload().keySet(); + } + + public Set<Map.Entry<String, String>> entrySet() + { + return iu.getPayload().entrySet(); + } + + public Collection<String> values() + { + return iu.getPayload().values(); + } + + public void close() + { + outBuffer.close(); + inBuffer.close(); + } +} diff --git a/ipaacalib/java/src/ipaaca/util/BlackboardClient.java b/ipaacalib/java/src/ipaaca/util/BlackboardClient.java new file mode 100644 index 0000000..6f70e78 --- /dev/null +++ b/ipaacalib/java/src/ipaaca/util/BlackboardClient.java @@ -0,0 +1,129 @@ +package ipaaca.util; + +import ipaaca.AbstractIU; +import ipaaca.HandlerFunctor; +import ipaaca.IUEventType; +import ipaaca.InputBuffer; +import ipaaca.LocalMessageIU; +import ipaaca.OutputBuffer; +import ipaaca.protobuf.Ipaaca.IU; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; + +/** + * Client to get/set key value pairs on a Blackboard + * @author hvanwelbergen + * + */ +public class BlackboardClient +{ + private final InputBuffer inBuffer; + private final OutputBuffer outBuffer; + private List<BlackboardUpdateListener> listeners = Collections.synchronizedList(new ArrayList<BlackboardUpdateListener>()); + private final String category; + + public BlackboardClient(String id, String category) + { + this(id, category, "default"); + } + + public BlackboardClient(String id, String category, String channel) + { + this.category = category; + inBuffer = new InputBuffer(id, ImmutableSet.of(category, ComponentNotifier.NOTIFY_CATEGORY), channel); + inBuffer.setResendActive(true); + inBuffer.registerHandler(new HandlerFunctor() + { + @Override + public void handle(AbstractIU iu, IUEventType type, boolean local) + { + synchronized (listeners) + { + for (BlackboardUpdateListener listener : listeners) + { + listener.update(); + } + } + } + }, ImmutableSet.of(category)); + outBuffer = new OutputBuffer(id); + ComponentNotifier notifier = new ComponentNotifier(id, category, new HashSet<String>(), ImmutableSet.of(category), + outBuffer, inBuffer); + notifier.initialize(); + } + + public void close() + { + inBuffer.close(); + outBuffer.close(); + } + + public void waitForBlackboardConnection() + { + while(inBuffer.getIUs().isEmpty()); + } + + public String get(String key) + { + if (inBuffer.getIUs().isEmpty()) + { + return null; + } + return inBuffer.getIUs().iterator().next().getPayload().get(key); + } + + public void put(String key, String value) + { + LocalMessageIU iu = new LocalMessageIU(category+Blackboard.MESSAGE_SUFFIX); + iu.getPayload().put(key,value); + outBuffer.add(iu); + } + + public void putAll(Map<String,String> values) + { + LocalMessageIU iu = new LocalMessageIU(category+Blackboard.MESSAGE_SUFFIX); + iu.getPayload().putAll(values); + outBuffer.add(iu); + } + + private boolean hasIU() + { + return !inBuffer.getIUs().isEmpty(); + } + + private AbstractIU getIU() + { + return inBuffer.getIUs().iterator().next(); + } + + public Set<String> keySet() + { + if(!hasIU())return new HashSet<>(); + return getIU().getPayload().keySet(); + } + + public Set<Map.Entry<String, String>> entrySet() + { + if(!hasIU())return new HashSet<>(); + return getIU().getPayload().entrySet(); + } + + public Collection<String> values() + { + if(!hasIU())return new HashSet<>(); + return getIU().getPayload().values(); + } + + public void addUpdateListener(BlackboardUpdateListener listener) + { + listeners.add(listener); + } +} diff --git a/ipaacalib/java/src/ipaaca/util/BlackboardUpdateListener.java b/ipaacalib/java/src/ipaaca/util/BlackboardUpdateListener.java new file mode 100644 index 0000000..e1ebdde --- /dev/null +++ b/ipaacalib/java/src/ipaaca/util/BlackboardUpdateListener.java @@ -0,0 +1,6 @@ +package ipaaca.util; + +public interface BlackboardUpdateListener +{ + void update(); +} diff --git a/ipaacalib/java/test/src/ipaaca/JavaPythonTest.java b/ipaacalib/java/test/src/ipaaca/JavaPythonTest.java index 6c194a5..92f0b67 100644 --- a/ipaacalib/java/test/src/ipaaca/JavaPythonTest.java +++ b/ipaacalib/java/test/src/ipaaca/JavaPythonTest.java @@ -205,7 +205,7 @@ public class JavaPythonTest String pypr = PYTHON_PREAMBLE +"ob = ipaaca.OutputBuffer('pythonside')\n" +"iu = ipaaca.Message('JavaPythonTest')\n" - +"iu.payload['data'] = 'Hello from Python!'\n" + +"iu.payload = {'data':'Hello from Python!'}\n" +"time.sleep(0.1)\n" +"ob.add(iu)\n"; runPythonProgram(pypr); diff --git a/ipaacalib/java/test/src/ipaaca/util/BlackboardIntegrationTest.java b/ipaacalib/java/test/src/ipaaca/util/BlackboardIntegrationTest.java new file mode 100644 index 0000000..afd10b0 --- /dev/null +++ b/ipaacalib/java/test/src/ipaaca/util/BlackboardIntegrationTest.java @@ -0,0 +1,129 @@ +package ipaaca.util; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import ipaaca.Initializer; + +import org.junit.After; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; + +/** + * Integration tests for the blackboard + * @author hvanwelbergen + */ +public class BlackboardIntegrationTest +{ + static + { + Initializer.initializeIpaacaRsb(); + } + + private Blackboard bb = new Blackboard("myblackboard","blackboardx"); + private BlackboardClient bbc; + + @After + public void after() + { + bb.close(); + if(bbc!=null) + { + bbc.close(); + } + } + + @Test + public void testGetValueFromBlackboardBeforeConnection() + { + bb.put("key1","value1"); + bbc = new BlackboardClient("myblackboardclient","blackboardx"); + bbc.waitForBlackboardConnection(); + assertEquals("value1", bbc.get("key1")); + } + + @Test + public void testGetValueFromBlackboardAfterConnection() throws InterruptedException + { + bbc = new BlackboardClient("myblackboardclient","blackboardx"); + bbc.waitForBlackboardConnection(); + bb.put("key1","value1"); + Thread.sleep(200); + assertEquals("value1", bbc.get("key1")); + } + + @Test + public void testSetValueOnBlackboard() throws InterruptedException + { + bbc = new BlackboardClient("myblackboardclient","blackboardx"); + bbc.waitForBlackboardConnection(); + bbc.put("key2","value2"); + Thread.sleep(300); + assertEquals("value2", bb.get("key2")); + } + + @Test + public void testBlackboardUpdateHandler()throws InterruptedException + { + BlackboardUpdateListener mockListener = mock(BlackboardUpdateListener.class); + bb.addUpdateListener(mockListener); + bbc = new BlackboardClient("myblackboardclient","blackboardx"); + bbc.waitForBlackboardConnection(); + bbc.put("key2","value2"); + Thread.sleep(200); + bb.put("key2","value3"); + verify(mockListener,times(1)).update(); + } + + @Test + public void testBlackboardClientUpdateHandler() throws InterruptedException + { + BlackboardUpdateListener mockListener = mock(BlackboardUpdateListener.class); + bbc = new BlackboardClient("myblackboardclient","blackboardx"); + bbc.waitForBlackboardConnection(); + bbc.addUpdateListener(mockListener); + bb.put("key3","value3"); + Thread.sleep(200); + bbc.put("key3","value4"); + verify(mockListener,times(2)).update(); + } + + @Test + public void testSetManyValuesOnBlackboard() throws InterruptedException + { + bbc = new BlackboardClient("myblackboardclient","blackboardx"); + bbc.waitForBlackboardConnection(); + for(int i=0;i<100;i++) + { + bbc.put("key"+i,"value"+i); + bb.put("key"+i,"value"+i); + } + Thread.sleep(300); + assertEquals("value2", bb.get("key2")); + assertEquals("value3", bb.get("key3")); + } + + @Test + public void testSetValuesOnClient() throws InterruptedException + { + bbc = new BlackboardClient("myblackboardclient","blackboardx"); + bbc.waitForBlackboardConnection(); + bbc.putAll(ImmutableMap.of("key1","value1","key2","value2")); + Thread.sleep(200); + assertEquals("value1", bb.get("key1")); + assertEquals("value2", bb.get("key2")); + } + + @Test + public void testSetValuesOnBlackBoard() throws InterruptedException + { + bbc = new BlackboardClient("myblackboardclient","blackboardx"); + bbc.waitForBlackboardConnection(); + bb.putAll(ImmutableMap.of("key1","value1","key2","value2")); + Thread.sleep(200); + assertEquals("value1", bbc.get("key1")); + assertEquals("value2", bbc.get("key2")); + } +} diff --git a/ipaacalib/java/test/src/ipaaca/util/ComponentNotifierIntegrationTest.java b/ipaacalib/java/test/src/ipaaca/util/ComponentNotifierIntegrationTest.java index aef0e5a..5acbed6 100644 --- a/ipaacalib/java/test/src/ipaaca/util/ComponentNotifierIntegrationTest.java +++ b/ipaacalib/java/test/src/ipaaca/util/ComponentNotifierIntegrationTest.java @@ -1,21 +1,22 @@ package ipaaca.util; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import ipaaca.AbstractIU; import ipaaca.HandlerFunctor; import ipaaca.IUEventType; import ipaaca.Initializer; import ipaaca.InputBuffer; +import ipaaca.LocalIU; import ipaaca.OutputBuffer; -import ipaaca.util.ComponentNotifier; import java.util.Set; +import lombok.Getter; + import org.junit.After; import org.junit.Test; -import lombok.Getter; - import com.google.common.collect.ImmutableSet; /** @@ -29,7 +30,7 @@ public class ComponentNotifierIntegrationTest private ComponentNotifier notifier2; private InputBuffer inBuffer; private OutputBuffer outBuffer; - + private static final String OTHER_CATEGORY="OTHER"; static { Initializer.initializeIpaacaRsb(); @@ -67,6 +68,13 @@ public class ComponentNotifierIntegrationTest return new ComponentNotifier(id, "test", ImmutableSet.copyOf(sendList), ImmutableSet.copyOf(recvList), outBuffer, inBuffer); } + private ComponentNotifier setupCompNotifierWithOtherCategoryInputBuffer(String id, Set<String> sendList, Set<String> recvList) + { + inBuffer = new InputBuffer(id + "in", ImmutableSet.of(ComponentNotifier.NOTIFY_CATEGORY, OTHER_CATEGORY)); + outBuffer = new OutputBuffer(id + "out"); + return new ComponentNotifier(id, "test", ImmutableSet.copyOf(sendList), ImmutableSet.copyOf(recvList), outBuffer, inBuffer); + } + @Test public void testSelf() throws InterruptedException { @@ -97,4 +105,19 @@ public class ComponentNotifierIntegrationTest assertEquals(1, h1.getNumCalled()); assertEquals(1, h2.getNumCalled()); } + + @Test + public void testOtherCategoryInInputBuffer() throws InterruptedException + { + notifier1 = setupCompNotifierWithOtherCategoryInputBuffer("not1", ImmutableSet.of("a1", "b1"), ImmutableSet.of("a3", "b1")); + MyHandlerFunctor h1 = new MyHandlerFunctor(); + notifier1.addNotificationHandler(h1); + + OutputBuffer out = new OutputBuffer("out"); + LocalIU iu = new LocalIU(OTHER_CATEGORY); + out.add(iu); + Thread.sleep(500); + assertEquals(0, h1.getNumCalled()); + assertNotNull(inBuffer.getIU(iu.getUid())); + } } diff --git a/ipaacalib/python/src/ipaaca/buffer.py b/ipaacalib/python/src/ipaaca/buffer.py index fe7e8b6..c4b75a4 100644 --- a/ipaacalib/python/src/ipaaca/buffer.py +++ b/ipaacalib/python/src/ipaaca/buffer.py @@ -35,6 +35,7 @@ from __future__ import division, print_function import threading import uuid +import traceback import rsb @@ -170,6 +171,7 @@ class Buffer(object): if local: LOGGER.error('Local IU handler raised an exception upon remote write.' + unicode(e)) else: + print(unicode(traceback.format_exc())) raise e def _get_owning_component_name(self): diff --git a/ipaacatools/scripts/ipaaca-iu-injector b/ipaacatools/scripts/ipaaca-iu-injector index b785969..5f464da 100755 --- a/ipaacatools/scripts/ipaaca-iu-injector +++ b/ipaacatools/scripts/ipaaca-iu-injector @@ -97,11 +97,11 @@ if __name__ == '__main__': ob.add(iu) print( - 'Sent {iu_type} with category "{category}" and payload {{'.format(**vars(arguments)), + u'Sent {iu_type} with category "{category}" and payload {{'.format(**vars(arguments)), end='\n' if len(iu.payload) > 0 else '') for k, v in iu.payload.items(): - print(" '{key}': {value},".format(key=k, value=v)) - print('}.') + print(u" '{key}': {value},".format(key=k, value=v)) + print(u'}.') # Wait for updates to the IU try: -- GitLab