From 29b59512c53d620705af526ea8994971b7481781 Mon Sep 17 00:00:00 2001 From: Herwin van Welbergen <hvanwelbergen@TechFak.Uni-Bielefeld.DE> Date: Fri, 14 Dec 2012 17:43:48 +0100 Subject: [PATCH] - Added Notifier - Minor cleanups --- ipaacalib/java/src/ipaaca/IUConverter.java | 8 +- ipaacalib/java/src/ipaaca/IntConverter.java | 1 + ipaacalib/java/src/ipaaca/LocalIU.java | 2 +- ipaacalib/java/src/ipaaca/OutputBuffer.java | 14 +-- ipaacalib/java/src/ipaaca/RemotePushIU.java | 4 +- .../src/ipaaca/util/ComponentNotifier.java | 94 ++++++++++++++++ ipaacalib/java/test/ivy.xml | 4 + .../ComponentNotifierIntegrationTest.java | 89 +++++++++++++++ .../src/ipaaca/ComponentNotifierTest.java | 102 ++++++++++++++++++ ...onentPushCommunicationIntegrationTest.java | 12 ++- .../java/test/src/ipaaca/InputBufferTest.java | 2 +- .../java/test/src/ipaaca/JavaPythonTest.java | 4 +- .../java/test/src/ipaaca/LocalIUTest.java | 10 +- 13 files changed, 324 insertions(+), 22 deletions(-) create mode 100644 ipaacalib/java/src/ipaaca/util/ComponentNotifier.java create mode 100644 ipaacalib/java/test/src/ipaaca/ComponentNotifierIntegrationTest.java create mode 100644 ipaacalib/java/test/src/ipaaca/ComponentNotifierTest.java diff --git a/ipaacalib/java/src/ipaaca/IUConverter.java b/ipaacalib/java/src/ipaaca/IUConverter.java index eebb9d7..d53cada 100644 --- a/ipaacalib/java/src/ipaaca/IUConverter.java +++ b/ipaacalib/java/src/ipaaca/IUConverter.java @@ -10,16 +10,16 @@ import java.util.Collection; import java.util.List; import java.util.Map.Entry; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.SetMultimap; -import com.google.protobuf.InvalidProtocolBufferException; - import rsb.converter.ConversionException; import rsb.converter.Converter; import rsb.converter.ConverterSignature; import rsb.converter.UserData; import rsb.converter.WireContents; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; +import com.google.protobuf.InvalidProtocolBufferException; + /** * Serializes AbstractIUs into protocolbuffer IUs and vice versa. * @author hvanwelbergen diff --git a/ipaacalib/java/src/ipaaca/IntConverter.java b/ipaacalib/java/src/ipaaca/IntConverter.java index 9ca0c60..bbab125 100644 --- a/ipaacalib/java/src/ipaaca/IntConverter.java +++ b/ipaacalib/java/src/ipaaca/IntConverter.java @@ -3,6 +3,7 @@ package ipaaca; import ipaaca.protobuf.Ipaaca.IntMessage; import java.nio.ByteBuffer; + import rsb.converter.ConversionException; import rsb.converter.Converter; import rsb.converter.ConverterSignature; diff --git a/ipaacalib/java/src/ipaaca/LocalIU.java b/ipaacalib/java/src/ipaaca/LocalIU.java index b73fdff..056a52b 100644 --- a/ipaacalib/java/src/ipaaca/LocalIU.java +++ b/ipaacalib/java/src/ipaaca/LocalIU.java @@ -5,12 +5,12 @@ import ipaaca.protobuf.Ipaaca.IUPayloadUpdate; import ipaaca.protobuf.Ipaaca.LinkSet; import ipaaca.protobuf.Ipaaca.PayloadItem; -import java.util.UUID; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map.Entry; import java.util.Set; +import java.util.UUID; import com.google.common.collect.SetMultimap; diff --git a/ipaacalib/java/src/ipaaca/OutputBuffer.java b/ipaacalib/java/src/ipaaca/OutputBuffer.java index c976f7c..02cbd55 100644 --- a/ipaacalib/java/src/ipaaca/OutputBuffer.java +++ b/ipaacalib/java/src/ipaaca/OutputBuffer.java @@ -1,12 +1,5 @@ package ipaaca; -import rsb.Factory; -import rsb.Informer; -import rsb.InitializeException; -import rsb.RSBException; -import rsb.patterns.DataCallback; -import rsb.patterns.LocalServer; - import ipaaca.protobuf.Ipaaca; import ipaaca.protobuf.Ipaaca.IUCommission; import ipaaca.protobuf.Ipaaca.IULinkUpdate; @@ -20,6 +13,13 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rsb.Factory; +import rsb.Informer; +import rsb.InitializeException; +import rsb.RSBException; +import rsb.patterns.DataCallback; +import rsb.patterns.LocalServer; + import com.google.common.collect.HashMultimap; import com.google.common.collect.SetMultimap; diff --git a/ipaacalib/java/src/ipaaca/RemotePushIU.java b/ipaacalib/java/src/ipaaca/RemotePushIU.java index b61272a..c5d32bc 100644 --- a/ipaacalib/java/src/ipaaca/RemotePushIU.java +++ b/ipaacalib/java/src/ipaaca/RemotePushIU.java @@ -16,11 +16,11 @@ import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.SetMultimap; - import rsb.RSBException; import rsb.patterns.RemoteServer; +import com.google.common.collect.SetMultimap; + /** * A remote IU with access mode 'PUSH'. * @author hvanwelbergen diff --git a/ipaacalib/java/src/ipaaca/util/ComponentNotifier.java b/ipaacalib/java/src/ipaaca/util/ComponentNotifier.java new file mode 100644 index 0000000..3541c53 --- /dev/null +++ b/ipaacalib/java/src/ipaaca/util/ComponentNotifier.java @@ -0,0 +1,94 @@ +package ipaaca.util; + +import ipaaca.AbstractIU; +import ipaaca.HandlerFunctor; +import ipaaca.IUEventHandler; +import ipaaca.IUEventType; +import ipaaca.InputBuffer; +import ipaaca.LocalMessageIU; +import ipaaca.OutputBuffer; + +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; + +/** + * Utility class to handle component notification: a componentNotify is sent at initialization and whenever new components sent + * their componentNotify. + * @author hvanwelbergen + * + */ +public class ComponentNotifier +{ + public static final String NOTIFY_CATEGORY = "componentNotify"; + public static final String SEND_CATEGORIES = "send_categories"; + public static final String RECEIVE_CATEGORIES = "recv_categories"; + public static final String STATE = "state"; + public static final String NAME = "name"; + private final OutputBuffer outBuffer; + private final String componentName; + private final String componentFunction; + private final ImmutableSet<String> sendCategories; + private final ImmutableSet<String> receiveCategories; + private final InputBuffer inBuffer; + private List<HandlerFunctor> handlers = new ArrayList<HandlerFunctor>(); + + private class ComponentNotifyHandler implements HandlerFunctor + { + @Override + public void handle(AbstractIU iu, IUEventType type, boolean local) + { + if(iu.getPayload().get(NAME).equals(componentName))return; //don't re-notify self + for (HandlerFunctor h : handlers) + { + h.handle(iu, type, local); + } + if (iu.getPayload().get(STATE).equals("new")) + { + submitNotify(false); + } + } + } + + /** + * Register a handler that will be called whenever a new component notifies this ComponentNotifier + */ + public void addNotificationHandler(HandlerFunctor h) + { + handlers.add(h); + } + + private void submitNotify(boolean isNew) + { + LocalMessageIU notifyIU = new LocalMessageIU(); + notifyIU.setCategory(NOTIFY_CATEGORY); + notifyIU.getPayload().put(NAME, componentName); + notifyIU.getPayload().put("function", componentFunction); + notifyIU.getPayload().put(SEND_CATEGORIES, Joiner.on(",").join(sendCategories)); + notifyIU.getPayload().put(RECEIVE_CATEGORIES, Joiner.on(",").join(receiveCategories)); + notifyIU.getPayload().put(STATE, isNew ? "new" : "old"); + outBuffer.add(notifyIU); + } + + public ComponentNotifier(String componentName, String componentFunction, Set<String> sendCategories, Set<String> receiveCategories, + OutputBuffer outBuffer, InputBuffer inBuffer) + { + this.componentName = componentName; + this.componentFunction = componentFunction; + this.sendCategories = ImmutableSet.copyOf(sendCategories); + this.receiveCategories = ImmutableSet.copyOf(receiveCategories); + this.outBuffer = outBuffer; + this.inBuffer = inBuffer; + } + + public void initialize() + { + inBuffer.registerHandler(new IUEventHandler(new ComponentNotifyHandler(), EnumSet.of(IUEventType.ADDED), ImmutableSet + .of(NOTIFY_CATEGORY))); + submitNotify(true); + } +} diff --git a/ipaacalib/java/test/ivy.xml b/ipaacalib/java/test/ivy.xml index db91032..83f29f9 100644 --- a/ipaacalib/java/test/ivy.xml +++ b/ipaacalib/java/test/ivy.xml @@ -4,5 +4,9 @@ <dependency org="junit" name="junit" rev="latest.release" /> <dependency org="hamcrest" name="hamcrest-all" rev="latest.release" /> <dependency org="mockito" name="mockito-all" rev="latest.release" /> + <dependency org="jboss" name="javassist" rev="latest.release" /> + <dependency org="powermock" name="powermock-mockito" rev="latest.release" /> + <dependency org="logback" name="logback-classic" rev="latest.release" /> + <dependency org="logback" name="logback-core" rev="latest.release" /> </dependencies> </ivy-module> diff --git a/ipaacalib/java/test/src/ipaaca/ComponentNotifierIntegrationTest.java b/ipaacalib/java/test/src/ipaaca/ComponentNotifierIntegrationTest.java new file mode 100644 index 0000000..6ee8218 --- /dev/null +++ b/ipaacalib/java/test/src/ipaaca/ComponentNotifierIntegrationTest.java @@ -0,0 +1,89 @@ +package ipaaca; + +import static org.junit.Assert.assertEquals; +import ipaaca.util.ComponentNotifier; + +import java.util.Set; + +import org.junit.After; +import org.junit.Test; + +import lombok.Getter; + +import com.google.common.collect.ImmutableSet; + +/** + * Integration test for the ComponentNotifier, connects two of them. Requires a running spread daemon. + * @author hvanwelbergen + * + */ +public class ComponentNotifierIntegrationTest +{ + private ComponentNotifier notifier1; + private ComponentNotifier notifier2; + private InputBuffer inBuffer; + private OutputBuffer outBuffer; + + static + { + Initializer.initializeIpaacaRsb(); + } + + private class MyHandlerFunctor implements HandlerFunctor + { + @Getter + private volatile int numCalled = 0; + + @Override + public void handle(AbstractIU iu, IUEventType type, boolean local) + { + numCalled++; + } + } + + @After + public void after() + { + inBuffer.close(); + outBuffer.close(); + } + + private ComponentNotifier setupCompNotifier(String id, Set<String> sendList, Set<String> recvList) + { + inBuffer = new InputBuffer(id+"in", ImmutableSet.of(ComponentNotifier.NOTIFY_CATEGORY)); + outBuffer = new OutputBuffer(id+"out"); + return new ComponentNotifier(id, "test", ImmutableSet.copyOf(sendList), ImmutableSet.copyOf(recvList), outBuffer, inBuffer); + } + + + @Test + public void testSelf() throws InterruptedException + { + notifier1 = setupCompNotifier("not1", ImmutableSet.of("a1","b1"), ImmutableSet.of("a3","b1")); + MyHandlerFunctor h1 = new MyHandlerFunctor(); + notifier1.addNotificationHandler(h1); + + notifier1.initialize(); + Thread.sleep(500); + assertEquals(0, h1.getNumCalled()); + } + + @Test + public void testTwo() throws InterruptedException + { + notifier1 = setupCompNotifier("not1", ImmutableSet.of("a1","b1"), ImmutableSet.of("a3","b2")); + notifier2 = setupCompNotifier("not2", ImmutableSet.of("a2","b2"), ImmutableSet.of("a3","b1")); + MyHandlerFunctor h1 = new MyHandlerFunctor(); + MyHandlerFunctor h2 = new MyHandlerFunctor(); + notifier1.addNotificationHandler(h1); + notifier2.addNotificationHandler(h2); + + notifier1.initialize(); + Thread.sleep(500); + notifier2.initialize(); + Thread.sleep(500); + + assertEquals(1, h1.getNumCalled()); + assertEquals(1, h2.getNumCalled()); + } +} diff --git a/ipaacalib/java/test/src/ipaaca/ComponentNotifierTest.java b/ipaacalib/java/test/src/ipaaca/ComponentNotifierTest.java new file mode 100644 index 0000000..d5f7434 --- /dev/null +++ b/ipaacalib/java/test/src/ipaaca/ComponentNotifierTest.java @@ -0,0 +1,102 @@ +package ipaaca; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import ipaaca.util.ComponentNotifier; + +import java.util.Set; + +import org.hamcrest.collection.IsIterableContainingInAnyOrder; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; + +/** + * Unit tests for the ComponentNotifier + * @author hvanwelbergen + */ +public class ComponentNotifierTest +{ + private static final ImmutableSet<String> RECV_CAT = ImmutableSet.of("testrec1", "testrc2"); + private static final ImmutableSet<String> SEND_CAT = ImmutableSet.of("testsnd1", "testsnd2", "testsnd3"); + private OutputBuffer mockOutBuffer = mock(OutputBuffer.class); + private InputBuffer mockInBuffer = mock(InputBuffer.class); + private IUEventHandler inputHandler; + + private ComponentNotifier notifier = new ComponentNotifier("testcomp", "testfunc", SEND_CAT, RECV_CAT, mockOutBuffer, mockInBuffer); + + @Before + public void setup() + { + doAnswer(new Answer<Void>() + { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable + { + IUEventHandler handler = (IUEventHandler) (invocation.getArguments()[0]); + inputHandler = handler; + return null; + } + }).when(mockInBuffer).registerHandler(any(IUEventHandler.class)); + notifier.initialize(); + } + + @Test + public void testNotifyAtInit() + { + ArgumentCaptor<LocalIU> argument = ArgumentCaptor.forClass(LocalIU.class); + verify(mockOutBuffer).add(argument.capture()); + LocalIU iu = argument.getValue(); + assertEquals(ComponentNotifier.NOTIFY_CATEGORY, iu.getCategory()); + assertEquals("new", iu.getPayload().get(ComponentNotifier.STATE)); + assertThat(ImmutableSet.copyOf(iu.getPayload().get(ComponentNotifier.RECEIVE_CATEGORIES).split(",")), + IsIterableContainingInAnyOrder.containsInAnyOrder(RECV_CAT.toArray(new String[0]))); + assertThat(ImmutableSet.copyOf(iu.getPayload().get(ComponentNotifier.SEND_CATEGORIES).split(",")), + IsIterableContainingInAnyOrder.containsInAnyOrder(SEND_CAT.toArray(new String[0]))); + } + + private void sendNotify(String state, Set<String> receiveCats) + { + + AbstractIU mockIUNotify = mock(AbstractIU.class); + Payload mockNotifyPayload = mock(Payload.class); + when(mockIUNotify.getCategory()).thenReturn(ComponentNotifier.NOTIFY_CATEGORY); + when(mockIUNotify.getPayload()).thenReturn(mockNotifyPayload); + when(mockInBuffer.getIU("iuNotify")).thenReturn(mockIUNotify); + when(mockNotifyPayload.get(ComponentNotifier.STATE)).thenReturn(state); + when(mockNotifyPayload.get(ComponentNotifier.NAME)).thenReturn("namex"); + when(mockNotifyPayload.get(ComponentNotifier.SEND_CATEGORIES)).thenReturn(""); + when(mockNotifyPayload.get(ComponentNotifier.RECEIVE_CATEGORIES)).thenReturn(Joiner.on(",").join(receiveCats)); + inputHandler.call(mockInBuffer, "iuNotify", false, IUEventType.ADDED, ComponentNotifier.NOTIFY_CATEGORY); + } + + @Test + public void testNotifyAtNotifyNew() throws Exception + { + sendNotify("new", ImmutableSet.of("testsnd1")); + ArgumentCaptor<LocalIU> argument = ArgumentCaptor.forClass(LocalIU.class); + verify(mockOutBuffer, times(2)).add(argument.capture()); + LocalIU iu = argument.getAllValues().get(1); + assertEquals("componentNotify", iu.getCategory()); + assertEquals("old", iu.getPayload().get("state")); + } + + @Test + public void testNoNotifyAtNotifyOld() throws Exception + { + sendNotify("old", ImmutableSet.of("testsnd1")); + ArgumentCaptor<LocalIU> argument = ArgumentCaptor.forClass(LocalIU.class); + verify(mockOutBuffer,times(1)).add(argument.capture()); + } +} diff --git a/ipaacalib/java/test/src/ipaaca/ComponentPushCommunicationIntegrationTest.java b/ipaacalib/java/test/src/ipaaca/ComponentPushCommunicationIntegrationTest.java index 9097027..32b2bf4 100644 --- a/ipaacalib/java/test/src/ipaaca/ComponentPushCommunicationIntegrationTest.java +++ b/ipaacalib/java/test/src/ipaaca/ComponentPushCommunicationIntegrationTest.java @@ -1,11 +1,17 @@ package ipaaca; -import static org.junit.Assert.*; +import static ipaaca.IUTestUtil.assertEqualIU; +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import java.util.EnumSet; import java.util.Set; -import static org.hamcrest.collection.IsIterableContainingInAnyOrder.*; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -14,8 +20,6 @@ import org.junit.Test; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import static ipaaca.IUTestUtil.*; - /** * Integration test cases for IPAACA. * Requires a running spread daemon. diff --git a/ipaacalib/java/test/src/ipaaca/InputBufferTest.java b/ipaacalib/java/test/src/ipaaca/InputBufferTest.java index aa63e98..d0ff20a 100644 --- a/ipaacalib/java/test/src/ipaaca/InputBufferTest.java +++ b/ipaacalib/java/test/src/ipaaca/InputBufferTest.java @@ -1,6 +1,6 @@ package ipaaca; -import static org.junit.Assert.*; +import static org.junit.Assert.assertNotNull; import java.util.Set; diff --git a/ipaacalib/java/test/src/ipaaca/JavaPythonTest.java b/ipaacalib/java/test/src/ipaaca/JavaPythonTest.java index 201f900..95bb779 100644 --- a/ipaacalib/java/test/src/ipaaca/JavaPythonTest.java +++ b/ipaacalib/java/test/src/ipaaca/JavaPythonTest.java @@ -1,7 +1,9 @@ package ipaaca; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import java.io.BufferedInputStream; import java.io.BufferedReader; diff --git a/ipaacalib/java/test/src/ipaaca/LocalIUTest.java b/ipaacalib/java/test/src/ipaaca/LocalIUTest.java index 797859b..4ec53b5 100644 --- a/ipaacalib/java/test/src/ipaaca/LocalIUTest.java +++ b/ipaacalib/java/test/src/ipaaca/LocalIUTest.java @@ -1,11 +1,17 @@ package ipaaca; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import ipaaca.protobuf.Ipaaca.IUPayloadUpdate; import org.junit.Before; import org.junit.Test; -import static org.mockito.Mockito.*; /** * Unit testcases for the LocalIU * @author hvanwelbergen -- GitLab