From 76c26139a4bdf8837355d19a137f6608a4569f2b Mon Sep 17 00:00:00 2001
From: Herwin van Welbergen <hvanwelbergen@techfak.uni-bielefeld.de>
Date: Wed, 30 Mar 2016 16:31:39 +0200
Subject: [PATCH]  Added FutureIU, FutureIUs

---
 ipaacalib/java/.gitignore                     |  1 +
 .../src/ipaaca/util/BlackboardClient.java     | 15 ++-
 .../ipaaca/util/communication/FutureIU.java   | 91 +++++++++++++++++++
 .../ipaaca/util/communication/FutureIUs.java  | 71 +++++++++++++++
 .../util/communication/FutureIUTest.java      | 61 +++++++++++++
 .../util/communication/FutureIUsTest.java     | 81 +++++++++++++++++
 6 files changed, 312 insertions(+), 8 deletions(-)
 create mode 100644 ipaacalib/java/src/ipaaca/util/communication/FutureIU.java
 create mode 100644 ipaacalib/java/src/ipaaca/util/communication/FutureIUs.java
 create mode 100644 ipaacalib/java/test/src/ipaaca/util/communication/FutureIUTest.java
 create mode 100644 ipaacalib/java/test/src/ipaaca/util/communication/FutureIUsTest.java

diff --git a/ipaacalib/java/.gitignore b/ipaacalib/java/.gitignore
index 8ba77fe..c84f99e 100644
--- a/ipaacalib/java/.gitignore
+++ b/ipaacalib/java/.gitignore
@@ -6,3 +6,4 @@ dist
 privateprops
 .project
 .classpath
+/bin/
diff --git a/ipaacalib/java/src/ipaaca/util/BlackboardClient.java b/ipaacalib/java/src/ipaaca/util/BlackboardClient.java
index 6f70e78..ab72262 100644
--- a/ipaacalib/java/src/ipaaca/util/BlackboardClient.java
+++ b/ipaacalib/java/src/ipaaca/util/BlackboardClient.java
@@ -1,13 +1,5 @@
 package ipaaca.util;
 
-import ipaaca.AbstractIU;
-import ipaaca.HandlerFunctor;
-import ipaaca.IUEventType;
-import ipaaca.InputBuffer;
-import ipaaca.LocalMessageIU;
-import ipaaca.OutputBuffer;
-import ipaaca.protobuf.Ipaaca.IU;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -18,6 +10,13 @@ import java.util.Set;
 
 import com.google.common.collect.ImmutableSet;
 
+import ipaaca.AbstractIU;
+import ipaaca.HandlerFunctor;
+import ipaaca.IUEventType;
+import ipaaca.InputBuffer;
+import ipaaca.LocalMessageIU;
+import ipaaca.OutputBuffer;
+
 /**
  * Client to get/set key value pairs on a Blackboard
  * @author hvanwelbergen
diff --git a/ipaacalib/java/src/ipaaca/util/communication/FutureIU.java b/ipaacalib/java/src/ipaaca/util/communication/FutureIU.java
new file mode 100644
index 0000000..6d942d7
--- /dev/null
+++ b/ipaacalib/java/src/ipaaca/util/communication/FutureIU.java
@@ -0,0 +1,91 @@
+package ipaaca.util.communication;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableSet;
+
+import ipaaca.AbstractIU;
+import ipaaca.HandlerFunctor;
+import ipaaca.IUEventType;
+import ipaaca.InputBuffer;
+
+/**
+ * Obtain a IU in the future. Usage:<br>
+ * FutureIU fu = FutureIU("componentx", "status", "started"); //wait for componentx to send a message that is it fully started<br>
+ * [Start componentx, assumes that component x will send a message or other iu with status=started in the payload]<br>
+ * AbstractIU iu = fu.take();  //get the actual IU 
+ * @author hvanwelbergen
+ */
+public class FutureIU
+{
+    private final InputBuffer inBuffer;
+    private final BlockingQueue<AbstractIU> queue = new ArrayBlockingQueue<AbstractIU>(1);
+    
+    public FutureIU(String category, String idKey, String idVal)
+    {
+        inBuffer = new InputBuffer("FutureIU", ImmutableSet.of(category));
+        inBuffer.registerHandler(new HandlerFunctor()
+        {
+            @Override
+            public void handle(AbstractIU iu, IUEventType type, boolean local)
+            {
+                String id = iu.getPayload().get(idKey);
+                if (idVal.equals(id))
+                {
+                    try
+                    {
+                        queue.put(iu);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        Thread.interrupted();
+                    }
+                }
+            }
+        }, ImmutableSet.of(category));
+    }
+
+    /**
+     * Waits (if necessary) for the IU and take it (can be done only once)     
+     */
+    public AbstractIU take() throws InterruptedException
+    {
+        AbstractIU iu;
+        try
+        {
+            iu = queue.take();
+        }
+        finally
+        {
+            inBuffer.close();
+        }
+        return iu;
+    }
+
+    /**
+     * Wait for at most the given time for the IU and take it (can be done only once)    
+     */
+    public AbstractIU take(long timeout, TimeUnit unit) throws InterruptedException
+    {
+        AbstractIU iu;
+        try
+        {
+            iu = queue.poll(timeout, unit);
+        }
+        finally
+        {
+            inBuffer.close();
+        }
+        return iu;
+    }
+    
+    /**
+     * Closes the FutureIU, use only if get is not used.
+     */
+    public void close()
+    {
+        inBuffer.close();
+    }
+}
diff --git a/ipaacalib/java/src/ipaaca/util/communication/FutureIUs.java b/ipaacalib/java/src/ipaaca/util/communication/FutureIUs.java
new file mode 100644
index 0000000..298e7b9
--- /dev/null
+++ b/ipaacalib/java/src/ipaaca/util/communication/FutureIUs.java
@@ -0,0 +1,71 @@
+package ipaaca.util.communication;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import ipaaca.AbstractIU;
+import ipaaca.HandlerFunctor;
+import ipaaca.IUEventType;
+import ipaaca.InputBuffer;
+
+/**
+ * Obtain multiple future ius on an specific category. Usage:<br>
+ * FutureIUs futures = new FutureIUs(componentx, key);<br>
+ * [make componentx send a IU with key=keyvaluedesired1]<br>
+ * AbstractIU iu = futures.take(keyvaluedesired1);<br>
+ * [make componentx send a IU with key=keyvaluedesired2]
+ * AbstractIU iu = futures.take(keyvaluedesired2);<br>
+ * ...<br>
+ * futures.close();
+ * @author hvanwelbergen
+ */
+public class FutureIUs
+{
+    private final InputBuffer inBuffer;
+    private final Map<String,BlockingQueue<AbstractIU>> resultsMap = Collections.synchronizedMap(new HashMap<>());
+    
+    public FutureIUs(String category, String idKey)
+    {
+        inBuffer = new InputBuffer("FutureIUs", ImmutableSet.of(category));
+        inBuffer.registerHandler(new HandlerFunctor()
+        {
+            @Override
+            public void handle(AbstractIU iu, IUEventType type, boolean local)
+            {
+                String id = iu.getPayload().get(idKey);
+                resultsMap.putIfAbsent(id, new ArrayBlockingQueue<AbstractIU>(1));
+                resultsMap.get(id).add(iu);
+            }
+        }, ImmutableSet.of(category));
+    }
+    
+    /**
+     * Waits (if necessary) for the IU and take it (can be done only once)     
+     */
+    public AbstractIU take(String idValue) throws InterruptedException
+    {
+        resultsMap.putIfAbsent(idValue, new ArrayBlockingQueue<AbstractIU>(1));
+        return resultsMap.get(idValue).take();
+    }
+    
+    /**
+     * Wait for at most the given time for the IU and take it (can be done only once)    
+     */
+    public AbstractIU take(String idValue, long timeout, TimeUnit unit) throws InterruptedException
+    {
+        resultsMap.putIfAbsent(idValue, new ArrayBlockingQueue<AbstractIU>(1));
+        return resultsMap.get(idValue).poll(timeout, unit);
+    }
+    
+    public void close()
+    {
+        inBuffer.close();
+    }
+}
diff --git a/ipaacalib/java/test/src/ipaaca/util/communication/FutureIUTest.java b/ipaacalib/java/test/src/ipaaca/util/communication/FutureIUTest.java
new file mode 100644
index 0000000..bacf301
--- /dev/null
+++ b/ipaacalib/java/test/src/ipaaca/util/communication/FutureIUTest.java
@@ -0,0 +1,61 @@
+package ipaaca.util.communication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import ipaaca.LocalMessageIU;
+import ipaaca.OutputBuffer;
+
+/**
+ * Unit tests for the FutureIU
+ * @author hvanwelbergen
+ *
+ */
+public class FutureIUTest
+{
+    private final OutputBuffer outBuffer = new OutputBuffer("component1");
+
+    @Test(timeout = 2000)
+    public void testSendBeforeTake() throws InterruptedException
+    {
+        FutureIU fu = new FutureIU("cat1", "status", "started");
+        LocalMessageIU message = new LocalMessageIU("cat1");
+        message.getPayload().put("status", "started");
+        outBuffer.add(message);
+        assertEquals(message.getPayload(), fu.take().getPayload());
+    }
+
+    @Test(timeout = 2000)
+    public void testSendAfterTake() throws InterruptedException
+    {
+        FutureIU fu = new FutureIU("cat1", "status", "started");
+        LocalMessageIU message = new LocalMessageIU("cat1");
+        message.getPayload().put("status", "started");
+        Runnable send = () -> {
+            try
+            {
+                Thread.sleep(1000);
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+            outBuffer.add(message);
+        };
+        new Thread(send).start();
+        assertEquals(message.getPayload(), fu.take().getPayload());
+    }
+    
+    @Test
+    public void testInvalidKeyValue() throws InterruptedException
+    {
+        FutureIU fu = new FutureIU("cat1", "status", "started");
+        LocalMessageIU message = new LocalMessageIU("cat1");
+        message.getPayload().put("status", "cancelled");
+        assertNull(fu.take(1,TimeUnit.SECONDS));
+    }
+}
diff --git a/ipaacalib/java/test/src/ipaaca/util/communication/FutureIUsTest.java b/ipaacalib/java/test/src/ipaaca/util/communication/FutureIUsTest.java
new file mode 100644
index 0000000..87113d0
--- /dev/null
+++ b/ipaacalib/java/test/src/ipaaca/util/communication/FutureIUsTest.java
@@ -0,0 +1,81 @@
+package ipaaca.util.communication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Test;
+
+import ipaaca.LocalMessageIU;
+import ipaaca.OutputBuffer;
+
+/**
+ * Unit tests for FutureIUs
+ * @author hvanwelbergen
+ *
+ */
+public class FutureIUsTest
+{
+    private FutureIUs fus = new FutureIUs("cat1","id");
+    private final OutputBuffer outBuffer = new OutputBuffer("component1");
+    
+    @After
+    public void cleanup()
+    {
+        fus.close();
+    }
+    
+    @Test(timeout = 2000)
+    public void testSendBeforeTake() throws InterruptedException
+    {
+        LocalMessageIU message = new LocalMessageIU("cat1");
+        message.getPayload().put("id", "id1");
+        outBuffer.add(message);
+        assertEquals(message.getPayload(), fus.take("id1").getPayload());
+    }
+    
+    @Test(timeout = 2000)
+    public void testSendAfterTake() throws InterruptedException
+    {
+        LocalMessageIU message = new LocalMessageIU("cat1");
+        message.getPayload().put("id", "id1");        
+        Runnable send = () -> {
+            try
+            {
+                Thread.sleep(1000);
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+            outBuffer.add(message);
+        };
+        new Thread(send).start();
+        assertEquals(message.getPayload(), fus.take("id1").getPayload());    
+    }
+    
+    @Test
+    public void testNonMatchingKeyValue() throws InterruptedException
+    {
+        LocalMessageIU message = new LocalMessageIU("cat1");
+        message.getPayload().put("id", "id2");
+        outBuffer.add(message);
+        assertNull(fus.take("id1", 1,TimeUnit.SECONDS));
+    }
+    
+    @Test
+    public void testMultipleKeyValues() throws InterruptedException
+    {
+        LocalMessageIU message1 = new LocalMessageIU("cat1");
+        message1.getPayload().put("id", "id1");
+        LocalMessageIU message2 = new LocalMessageIU("cat1");
+        message2.getPayload().put("id", "id2");        
+        outBuffer.add(message2);
+        outBuffer.add(message1);
+        
+        assertEquals(message1.getPayload(), fus.take("id1").getPayload());
+        assertEquals(message2.getPayload(), fus.take("id2").getPayload());
+    }
+}
-- 
GitLab