Skip to content
Snippets Groups Projects
Commit 447c64b4 authored by hvanwelbergen's avatar hvanwelbergen
Browse files

- added removeHandler to Buffer

- FutureIU can now use an existing InputBuffer
parent 5a854976
No related branches found
No related tags found
No related merge requests found
...@@ -110,6 +110,11 @@ public abstract class Buffer ...@@ -110,6 +110,11 @@ public abstract class Buffer
{ {
eventHandlers.add(handler); eventHandlers.add(handler);
} }
public void removeHandler(IUEventHandler handler)
{
eventHandlers.remove(handler);
}
public void registerHandler(HandlerFunctor func) { public void registerHandler(HandlerFunctor func) {
IUEventHandler handler; IUEventHandler handler;
......
...@@ -8,6 +8,7 @@ import com.google.common.collect.ImmutableSet; ...@@ -8,6 +8,7 @@ import com.google.common.collect.ImmutableSet;
import ipaaca.AbstractIU; import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor; import ipaaca.HandlerFunctor;
import ipaaca.IUEventHandler;
import ipaaca.IUEventType; import ipaaca.IUEventType;
import ipaaca.InputBuffer; import ipaaca.InputBuffer;
...@@ -15,18 +16,19 @@ import ipaaca.InputBuffer; ...@@ -15,18 +16,19 @@ import ipaaca.InputBuffer;
* Obtain a IU in the future. Usage:<br> * Obtain a IU in the future. Usage:<br>
* FutureIU fu = FutureIU("componentx", "status", "started"); //wait for componentx to send a message that is it fully started<br> * FutureIU fu = FutureIU("componentx", "status", "started"); //wait for componentx to send a message that is it fully started<br>
* [Start componentx, assumes that component x will send a message or other iu with status=started in the payload]<br> * [Start componentx, assumes that component x will send a message or other iu with status=started in the payload]<br>
* AbstractIU iu = fu.take(); //get the actual IU * AbstractIU iu = fu.take(); //get the actual IU
* @author hvanwelbergen * @author hvanwelbergen
*/ */
public class FutureIU public class FutureIU
{ {
private final InputBuffer inBuffer; private final InputBuffer inBuffer;
private final BlockingQueue<AbstractIU> queue = new ArrayBlockingQueue<AbstractIU>(1); private final BlockingQueue<AbstractIU> queue = new ArrayBlockingQueue<AbstractIU>(1);
private final IUEventHandler handler;
public FutureIU(String category, String idKey, String idVal)
public FutureIU(String category, String idKey, String idVal, InputBuffer inBuffer)
{ {
inBuffer = new InputBuffer("FutureIU", ImmutableSet.of(category)); this.inBuffer = inBuffer;
inBuffer.registerHandler(new HandlerFunctor() handler = new IUEventHandler(new HandlerFunctor()
{ {
@Override @Override
public void handle(AbstractIU iu, IUEventType type, boolean local) public void handle(AbstractIU iu, IUEventType type, boolean local)
...@@ -34,14 +36,32 @@ public class FutureIU ...@@ -34,14 +36,32 @@ public class FutureIU
String id = iu.getPayload().get(idKey); String id = iu.getPayload().get(idKey);
if (idVal.equals(id)) if (idVal.equals(id))
{ {
queue.offer(iu); queue.offer(iu);
} }
} }
}, ImmutableSet.of(category)); }, ImmutableSet.of(category));
inBuffer.registerHandler(handler);
}
public FutureIU(String category, String idKey, String idVal)
{
this(category, idKey, idVal, new InputBuffer("FutureIU", ImmutableSet.of(category)));
}
/**
* Closes the FutureIU, use only if get is not used.
*/
public void cleanup()
{
inBuffer.removeHandler(handler);
if (inBuffer.getOwningComponentName().equals("FutureIU"))
{
inBuffer.close();
}
} }
/** /**
* Waits (if necessary) for the IU and take it (can be done only once) * Waits (if necessary) for the IU and take it (can be done only once)
*/ */
public AbstractIU take() throws InterruptedException public AbstractIU take() throws InterruptedException
{ {
...@@ -52,13 +72,13 @@ public class FutureIU ...@@ -52,13 +72,13 @@ public class FutureIU
} }
finally finally
{ {
inBuffer.close(); cleanup();
} }
return iu; return iu;
} }
/** /**
* Wait for at most the given time for the IU and take it (can be done only once), return null on timeout * Wait for at most the given time for the IU and take it (can be done only once), return null on timeout
*/ */
public AbstractIU take(long timeout, TimeUnit unit) throws InterruptedException public AbstractIU take(long timeout, TimeUnit unit) throws InterruptedException
{ {
...@@ -69,16 +89,8 @@ public class FutureIU ...@@ -69,16 +89,8 @@ public class FutureIU
} }
finally finally
{ {
inBuffer.close(); cleanup();
} }
return iu; return iu;
} }
/**
* Closes the FutureIU, use only if get is not used.
*/
public void close()
{
inBuffer.close();
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment