Skip to content
Snippets Groups Projects
Commit 8b7721bd authored by Herwin van Welbergen's avatar Herwin van Welbergen
Browse files

- added waitforreceivers functionality to notifier

- made sure the initializer is only initialized once
- toString methods for payload and localIU
parent b85b0aa4
No related branches found
No related tags found
No related merge requests found
......@@ -15,9 +15,11 @@ public final class Initializer
private Initializer()
{
}
public static void initializeIpaacaRsb()
private static volatile boolean initialized = false;
public synchronized static void initializeIpaacaRsb()
{
if(initialized)return;
DefaultConverterRepository.getDefaultConverterRepository().addConverter(new IntConverter());
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new ProtocolBufferConverter<IUCommission>(IUCommission.getDefaultInstance()));
......@@ -33,6 +35,6 @@ public final class Initializer
DefaultConverterRepository.getDefaultConverterRepository().addConverter(new PayloadConverter());
DefaultConverterRepository.getDefaultConverterRepository().addConverter(new LinkUpdateConverter());
initialized = true;
}
}
......@@ -239,4 +239,10 @@ public class LocalIU extends AbstractIU
getOutputBuffer().sendIUPayloadUpdate(this, update);
}
}
@Override
public String toString()
{
return "LocalIU with category: "+this.getCategory() + "\nowner: "+getOwnerName()+"\npayload: "+this.getPayload();
}
}
......@@ -235,4 +235,10 @@ public class Payload implements Map<String, String>
{
return map.values();
}
@Override
public String toString()
{
return map.toString();
}
}
......@@ -10,10 +10,14 @@ import ipaaca.OutputBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
/**
......@@ -29,6 +33,7 @@ public class ComponentNotifier
public static final String RECEIVE_CATEGORIES = "recv_categories";
public static final String STATE = "state";
public static final String NAME = "name";
public static final String FUNCTION = "function";
private final OutputBuffer outBuffer;
private final String componentName;
private final String componentFunction;
......@@ -36,13 +41,17 @@ public class ComponentNotifier
private final ImmutableSet<String> receiveCategories;
private final InputBuffer inBuffer;
private List<HandlerFunctor> handlers = new ArrayList<HandlerFunctor>();
private volatile boolean isInitialized = false;
private final BlockingQueue<String> receiverQueue = new LinkedBlockingQueue<>();
private class ComponentNotifyHandler implements HandlerFunctor
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
if(iu.getPayload().get(NAME).equals(componentName))return; //don't re-notify self
String receivers[] = iu.getPayload().get(RECEIVE_CATEGORIES).split("\\s*,\\s*");
receiverQueue.addAll(ImmutableSet.copyOf(receivers));
for (HandlerFunctor h : handlers)
{
h.handle(iu, type, local);
......@@ -61,6 +70,34 @@ public class ComponentNotifier
{
handlers.add(h);
}
/**
* Wait until the receivers are registered for categories
*/
public void waitForReceivers(ImmutableSet<String> categories)
{
Set<String> unhandledCategories = new HashSet<>(categories);
while(!unhandledCategories.isEmpty())
{
try
{
unhandledCategories.remove(receiverQueue.take());
}
catch (InterruptedException e)
{
Thread.interrupted();
}
}
}
/**
* Wait until receivers are registered for all categories sent by the component
*/
public void waitForReceivers()
{
waitForReceivers(sendCategories);
}
private void submitNotify(boolean isNew)
{
......@@ -87,8 +124,12 @@ public class ComponentNotifier
public void initialize()
{
inBuffer.registerHandler(new IUEventHandler(new ComponentNotifyHandler(), EnumSet.of(IUEventType.ADDED), ImmutableSet
.of(NOTIFY_CATEGORY)));
submitNotify(true);
if(!isInitialized)
{
inBuffer.registerHandler(new IUEventHandler(new ComponentNotifyHandler(), EnumSet.of(IUEventType.ADDED), ImmutableSet
.of(NOTIFY_CATEGORY)));
submitNotify(true);
isInitialized = true;
}
}
}
package ipaaca;
package ipaaca.util;
import static org.junit.Assert.assertEquals;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.Initializer;
import ipaaca.InputBuffer;
import ipaaca.OutputBuffer;
import ipaaca.util.ComponentNotifier;
import java.util.Set;
......
package ipaaca;
package ipaaca.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.doAnswer;
import ipaaca.util.ComponentNotifier;
import ipaaca.AbstractIU;
import ipaaca.IUEventHandler;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
import ipaaca.LocalIU;
import ipaaca.OutputBuffer;
import ipaaca.Payload;
import java.util.Set;
......@@ -91,12 +99,63 @@ public class ComponentNotifierTest
assertEquals("componentNotify", iu.getCategory());
assertEquals("old", iu.getPayload().get("state"));
}
@Test
public void testNoNotifyAtNotifyOld() throws Exception
{
sendNotify("old", ImmutableSet.of("testsnd1"));
sendNotify("old", ImmutableSet.of("testsnd1"));
ArgumentCaptor<LocalIU> argument = ArgumentCaptor.forClass(LocalIU.class);
verify(mockOutBuffer,times(1)).add(argument.capture());
verify(mockOutBuffer, times(1)).add(argument.capture());
}
private class WaitForFinish extends Thread
{
public volatile boolean waitFinish = false;
public void run()
{
notifier.waitForReceivers();
waitFinish = true;
}
}
@Test
public void testWait() throws InterruptedException
{
WaitForFinish wff = new WaitForFinish();
wff.start();
Thread.sleep(200);
assertFalse(wff.waitFinish);
sendNotify("new",SEND_CAT);
Thread.sleep(200);
assertTrue(wff.waitFinish);
}
@Test
public void testWaitWrongCats() throws InterruptedException
{
WaitForFinish wff = new WaitForFinish();
wff.start();
Thread.sleep(200);
assertFalse(wff.waitFinish);
sendNotify("new",RECV_CAT);
Thread.sleep(200);
assertFalse(wff.waitFinish);
}
@Test
public void testWaitIncremental()throws InterruptedException
{
WaitForFinish wff = new WaitForFinish();
wff.start();
Thread.sleep(200);
assertFalse(wff.waitFinish);
sendNotify("new",ImmutableSet.of("testsnd1", "testsnd2"));
Thread.sleep(200);
assertFalse(wff.waitFinish);
sendNotify("new",ImmutableSet.of("testsnd3"));
Thread.sleep(200);
assertTrue(wff.waitFinish);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment