diff --git a/ipaacalib/java/src/ipaaca/Initializer.java b/ipaacalib/java/src/ipaaca/Initializer.java index 4aa0d452969a5c05d9560e317df986f983001635..ed124003379a704f210bad748a59bd8bd47b9ee2 100644 --- a/ipaacalib/java/src/ipaaca/Initializer.java +++ b/ipaacalib/java/src/ipaaca/Initializer.java @@ -15,9 +15,11 @@ public final class Initializer private Initializer() { } - - public static void initializeIpaacaRsb() + private static volatile boolean initialized = false; + + public synchronized static void initializeIpaacaRsb() { + if(initialized)return; DefaultConverterRepository.getDefaultConverterRepository().addConverter(new IntConverter()); DefaultConverterRepository.getDefaultConverterRepository().addConverter( new ProtocolBufferConverter<IUCommission>(IUCommission.getDefaultInstance())); @@ -33,6 +35,6 @@ public final class Initializer DefaultConverterRepository.getDefaultConverterRepository().addConverter(new PayloadConverter()); DefaultConverterRepository.getDefaultConverterRepository().addConverter(new LinkUpdateConverter()); - + initialized = true; } } diff --git a/ipaacalib/java/src/ipaaca/LocalIU.java b/ipaacalib/java/src/ipaaca/LocalIU.java index 056a52b817a59a4ee0231c2d0033614655f1a469..3581d546d5c9566418d3505fe14d4ca359d2bfe7 100644 --- a/ipaacalib/java/src/ipaaca/LocalIU.java +++ b/ipaacalib/java/src/ipaaca/LocalIU.java @@ -239,4 +239,10 @@ public class LocalIU extends AbstractIU getOutputBuffer().sendIUPayloadUpdate(this, update); } } + + @Override + public String toString() + { + return "LocalIU with category: "+this.getCategory() + "\nowner: "+getOwnerName()+"\npayload: "+this.getPayload(); + } } diff --git a/ipaacalib/java/src/ipaaca/Payload.java b/ipaacalib/java/src/ipaaca/Payload.java index c2e3a93604a3e68817c3615c20bcc032318a4e85..eeaa8e797ed523cb21d7feb51ecc665c57314716 100644 --- a/ipaacalib/java/src/ipaaca/Payload.java +++ b/ipaacalib/java/src/ipaaca/Payload.java @@ -235,4 +235,10 @@ public class Payload implements Map<String, String> { return map.values(); } + + @Override + public String toString() + { + return map.toString(); + } } diff --git a/ipaacalib/java/src/ipaaca/util/ComponentNotifier.java b/ipaacalib/java/src/ipaaca/util/ComponentNotifier.java index 3541c53dd10435607ea3c7a5ec4406630b279bc0..a414bce794d9c5620677d1e13f0f879a3b58cfaa 100644 --- a/ipaacalib/java/src/ipaaca/util/ComponentNotifier.java +++ b/ipaacalib/java/src/ipaaca/util/ComponentNotifier.java @@ -10,10 +10,14 @@ import ipaaca.OutputBuffer; import java.util.ArrayList; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; /** @@ -29,6 +33,7 @@ public class ComponentNotifier public static final String RECEIVE_CATEGORIES = "recv_categories"; public static final String STATE = "state"; public static final String NAME = "name"; + public static final String FUNCTION = "function"; private final OutputBuffer outBuffer; private final String componentName; private final String componentFunction; @@ -36,13 +41,17 @@ public class ComponentNotifier private final ImmutableSet<String> receiveCategories; private final InputBuffer inBuffer; private List<HandlerFunctor> handlers = new ArrayList<HandlerFunctor>(); - + private volatile boolean isInitialized = false; + private final BlockingQueue<String> receiverQueue = new LinkedBlockingQueue<>(); + private class ComponentNotifyHandler implements HandlerFunctor { @Override public void handle(AbstractIU iu, IUEventType type, boolean local) { if(iu.getPayload().get(NAME).equals(componentName))return; //don't re-notify self + String receivers[] = iu.getPayload().get(RECEIVE_CATEGORIES).split("\\s*,\\s*"); + receiverQueue.addAll(ImmutableSet.copyOf(receivers)); for (HandlerFunctor h : handlers) { h.handle(iu, type, local); @@ -61,6 +70,34 @@ public class ComponentNotifier { handlers.add(h); } + + /** + * Wait until the receivers are registered for categories + */ + public void waitForReceivers(ImmutableSet<String> categories) + { + Set<String> unhandledCategories = new HashSet<>(categories); + while(!unhandledCategories.isEmpty()) + { + try + { + unhandledCategories.remove(receiverQueue.take()); + + } + catch (InterruptedException e) + { + Thread.interrupted(); + } + } + } + + /** + * Wait until receivers are registered for all categories sent by the component + */ + public void waitForReceivers() + { + waitForReceivers(sendCategories); + } private void submitNotify(boolean isNew) { @@ -87,8 +124,12 @@ public class ComponentNotifier public void initialize() { - inBuffer.registerHandler(new IUEventHandler(new ComponentNotifyHandler(), EnumSet.of(IUEventType.ADDED), ImmutableSet - .of(NOTIFY_CATEGORY))); - submitNotify(true); + if(!isInitialized) + { + inBuffer.registerHandler(new IUEventHandler(new ComponentNotifyHandler(), EnumSet.of(IUEventType.ADDED), ImmutableSet + .of(NOTIFY_CATEGORY))); + submitNotify(true); + isInitialized = true; + } } } diff --git a/ipaacalib/java/test/src/ipaaca/ComponentNotifierIntegrationTest.java b/ipaacalib/java/test/src/ipaaca/util/ComponentNotifierIntegrationTest.java similarity index 93% rename from ipaacalib/java/test/src/ipaaca/ComponentNotifierIntegrationTest.java rename to ipaacalib/java/test/src/ipaaca/util/ComponentNotifierIntegrationTest.java index 6ee8218bf3796ce3c35e9ff50670da03e7f05930..334f2ecdd61d31e053077794bf18607ea181e338 100644 --- a/ipaacalib/java/test/src/ipaaca/ComponentNotifierIntegrationTest.java +++ b/ipaacalib/java/test/src/ipaaca/util/ComponentNotifierIntegrationTest.java @@ -1,6 +1,12 @@ -package ipaaca; +package ipaaca.util; import static org.junit.Assert.assertEquals; +import ipaaca.AbstractIU; +import ipaaca.HandlerFunctor; +import ipaaca.IUEventType; +import ipaaca.Initializer; +import ipaaca.InputBuffer; +import ipaaca.OutputBuffer; import ipaaca.util.ComponentNotifier; import java.util.Set; diff --git a/ipaacalib/java/test/src/ipaaca/ComponentNotifierTest.java b/ipaacalib/java/test/src/ipaaca/util/ComponentNotifierTest.java similarity index 69% rename from ipaacalib/java/test/src/ipaaca/ComponentNotifierTest.java rename to ipaacalib/java/test/src/ipaaca/util/ComponentNotifierTest.java index d5f7434267f4201eefe7f78b7d3ae3d27c0339c9..6d06acf22a9993adf181427e45977e1cc7c24ce6 100644 --- a/ipaacalib/java/test/src/ipaaca/ComponentNotifierTest.java +++ b/ipaacalib/java/test/src/ipaaca/util/ComponentNotifierTest.java @@ -1,14 +1,22 @@ -package ipaaca; +package ipaaca.util; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.doAnswer; -import ipaaca.util.ComponentNotifier; +import ipaaca.AbstractIU; +import ipaaca.IUEventHandler; +import ipaaca.IUEventType; +import ipaaca.InputBuffer; +import ipaaca.LocalIU; +import ipaaca.OutputBuffer; +import ipaaca.Payload; import java.util.Set; @@ -91,12 +99,63 @@ public class ComponentNotifierTest assertEquals("componentNotify", iu.getCategory()); assertEquals("old", iu.getPayload().get("state")); } - + @Test public void testNoNotifyAtNotifyOld() throws Exception { - sendNotify("old", ImmutableSet.of("testsnd1")); + sendNotify("old", ImmutableSet.of("testsnd1")); ArgumentCaptor<LocalIU> argument = ArgumentCaptor.forClass(LocalIU.class); - verify(mockOutBuffer,times(1)).add(argument.capture()); + verify(mockOutBuffer, times(1)).add(argument.capture()); + } + + private class WaitForFinish extends Thread + { + public volatile boolean waitFinish = false; + + public void run() + { + notifier.waitForReceivers(); + waitFinish = true; + } + + } + + @Test + public void testWait() throws InterruptedException + { + WaitForFinish wff = new WaitForFinish(); + wff.start(); + Thread.sleep(200); + assertFalse(wff.waitFinish); + sendNotify("new",SEND_CAT); + Thread.sleep(200); + assertTrue(wff.waitFinish); + } + + @Test + public void testWaitWrongCats() throws InterruptedException + { + WaitForFinish wff = new WaitForFinish(); + wff.start(); + Thread.sleep(200); + assertFalse(wff.waitFinish); + sendNotify("new",RECV_CAT); + Thread.sleep(200); + assertFalse(wff.waitFinish); + } + + @Test + public void testWaitIncremental()throws InterruptedException + { + WaitForFinish wff = new WaitForFinish(); + wff.start(); + Thread.sleep(200); + assertFalse(wff.waitFinish); + sendNotify("new",ImmutableSet.of("testsnd1", "testsnd2")); + Thread.sleep(200); + assertFalse(wff.waitFinish); + sendNotify("new",ImmutableSet.of("testsnd3")); + Thread.sleep(200); + assertTrue(wff.waitFinish); } }