Skip to content
Snippets Groups Projects
Commit 76c26139 authored by Herwin van Welbergen's avatar Herwin van Welbergen
Browse files

Added FutureIU, FutureIUs

parent 33664e85
No related branches found
No related tags found
No related merge requests found
...@@ -6,3 +6,4 @@ dist ...@@ -6,3 +6,4 @@ dist
privateprops privateprops
.project .project
.classpath .classpath
/bin/
package ipaaca.util; package ipaaca.util;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
import ipaaca.protobuf.Ipaaca.IU;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
...@@ -18,6 +10,13 @@ import java.util.Set; ...@@ -18,6 +10,13 @@ import java.util.Set;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
/** /**
* Client to get/set key value pairs on a Blackboard * Client to get/set key value pairs on a Blackboard
* @author hvanwelbergen * @author hvanwelbergen
......
package ipaaca.util.communication;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableSet;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
/**
* Obtain a IU in the future. Usage:<br>
* FutureIU fu = FutureIU("componentx", "status", "started"); //wait for componentx to send a message that is it fully started<br>
* [Start componentx, assumes that component x will send a message or other iu with status=started in the payload]<br>
* AbstractIU iu = fu.take(); //get the actual IU
* @author hvanwelbergen
*/
public class FutureIU
{
private final InputBuffer inBuffer;
private final BlockingQueue<AbstractIU> queue = new ArrayBlockingQueue<AbstractIU>(1);
public FutureIU(String category, String idKey, String idVal)
{
inBuffer = new InputBuffer("FutureIU", ImmutableSet.of(category));
inBuffer.registerHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
String id = iu.getPayload().get(idKey);
if (idVal.equals(id))
{
try
{
queue.put(iu);
}
catch (InterruptedException e)
{
Thread.interrupted();
}
}
}
}, ImmutableSet.of(category));
}
/**
* Waits (if necessary) for the IU and take it (can be done only once)
*/
public AbstractIU take() throws InterruptedException
{
AbstractIU iu;
try
{
iu = queue.take();
}
finally
{
inBuffer.close();
}
return iu;
}
/**
* Wait for at most the given time for the IU and take it (can be done only once)
*/
public AbstractIU take(long timeout, TimeUnit unit) throws InterruptedException
{
AbstractIU iu;
try
{
iu = queue.poll(timeout, unit);
}
finally
{
inBuffer.close();
}
return iu;
}
/**
* Closes the FutureIU, use only if get is not used.
*/
public void close()
{
inBuffer.close();
}
}
package ipaaca.util.communication;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
/**
* Obtain multiple future ius on an specific category. Usage:<br>
* FutureIUs futures = new FutureIUs(componentx, key);<br>
* [make componentx send a IU with key=keyvaluedesired1]<br>
* AbstractIU iu = futures.take(keyvaluedesired1);<br>
* [make componentx send a IU with key=keyvaluedesired2]
* AbstractIU iu = futures.take(keyvaluedesired2);<br>
* ...<br>
* futures.close();
* @author hvanwelbergen
*/
public class FutureIUs
{
private final InputBuffer inBuffer;
private final Map<String,BlockingQueue<AbstractIU>> resultsMap = Collections.synchronizedMap(new HashMap<>());
public FutureIUs(String category, String idKey)
{
inBuffer = new InputBuffer("FutureIUs", ImmutableSet.of(category));
inBuffer.registerHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
String id = iu.getPayload().get(idKey);
resultsMap.putIfAbsent(id, new ArrayBlockingQueue<AbstractIU>(1));
resultsMap.get(id).add(iu);
}
}, ImmutableSet.of(category));
}
/**
* Waits (if necessary) for the IU and take it (can be done only once)
*/
public AbstractIU take(String idValue) throws InterruptedException
{
resultsMap.putIfAbsent(idValue, new ArrayBlockingQueue<AbstractIU>(1));
return resultsMap.get(idValue).take();
}
/**
* Wait for at most the given time for the IU and take it (can be done only once)
*/
public AbstractIU take(String idValue, long timeout, TimeUnit unit) throws InterruptedException
{
resultsMap.putIfAbsent(idValue, new ArrayBlockingQueue<AbstractIU>(1));
return resultsMap.get(idValue).poll(timeout, unit);
}
public void close()
{
inBuffer.close();
}
}
package ipaaca.util.communication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
/**
* Unit tests for the FutureIU
* @author hvanwelbergen
*
*/
public class FutureIUTest
{
private final OutputBuffer outBuffer = new OutputBuffer("component1");
@Test(timeout = 2000)
public void testSendBeforeTake() throws InterruptedException
{
FutureIU fu = new FutureIU("cat1", "status", "started");
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("status", "started");
outBuffer.add(message);
assertEquals(message.getPayload(), fu.take().getPayload());
}
@Test(timeout = 2000)
public void testSendAfterTake() throws InterruptedException
{
FutureIU fu = new FutureIU("cat1", "status", "started");
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("status", "started");
Runnable send = () -> {
try
{
Thread.sleep(1000);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
outBuffer.add(message);
};
new Thread(send).start();
assertEquals(message.getPayload(), fu.take().getPayload());
}
@Test
public void testInvalidKeyValue() throws InterruptedException
{
FutureIU fu = new FutureIU("cat1", "status", "started");
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("status", "cancelled");
assertNull(fu.take(1,TimeUnit.SECONDS));
}
}
package ipaaca.util.communication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Test;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
/**
* Unit tests for FutureIUs
* @author hvanwelbergen
*
*/
public class FutureIUsTest
{
private FutureIUs fus = new FutureIUs("cat1","id");
private final OutputBuffer outBuffer = new OutputBuffer("component1");
@After
public void cleanup()
{
fus.close();
}
@Test(timeout = 2000)
public void testSendBeforeTake() throws InterruptedException
{
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("id", "id1");
outBuffer.add(message);
assertEquals(message.getPayload(), fus.take("id1").getPayload());
}
@Test(timeout = 2000)
public void testSendAfterTake() throws InterruptedException
{
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("id", "id1");
Runnable send = () -> {
try
{
Thread.sleep(1000);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
outBuffer.add(message);
};
new Thread(send).start();
assertEquals(message.getPayload(), fus.take("id1").getPayload());
}
@Test
public void testNonMatchingKeyValue() throws InterruptedException
{
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("id", "id2");
outBuffer.add(message);
assertNull(fus.take("id1", 1,TimeUnit.SECONDS));
}
@Test
public void testMultipleKeyValues() throws InterruptedException
{
LocalMessageIU message1 = new LocalMessageIU("cat1");
message1.getPayload().put("id", "id1");
LocalMessageIU message2 = new LocalMessageIU("cat1");
message2.getPayload().put("id", "id2");
outBuffer.add(message2);
outBuffer.add(message1);
assertEquals(message1.getPayload(), fus.take("id1").getPayload());
assertEquals(message2.getPayload(), fus.take("id2").getPayload());
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment