From 8b7721bd7f02458dd5ca5fbbdb5b15e69dc24ed5 Mon Sep 17 00:00:00 2001
From: Herwin van Welbergen <hvanwelbergen@TechFak.Uni-Bielefeld.DE>
Date: Thu, 20 Dec 2012 10:28:03 +0100
Subject: [PATCH] - added waitforreceivers functionality to notifier - made
 sure the initializer is only initialized once - toString methods for payload
 and localIU

---
 ipaacalib/java/src/ipaaca/Initializer.java    |  8 ++-
 ipaacalib/java/src/ipaaca/LocalIU.java        |  6 ++
 ipaacalib/java/src/ipaaca/Payload.java        |  6 ++
 .../src/ipaaca/util/ComponentNotifier.java    | 49 +++++++++++--
 .../ComponentNotifierIntegrationTest.java     |  8 ++-
 .../{ => util}/ComponentNotifierTest.java     | 69 +++++++++++++++++--
 6 files changed, 133 insertions(+), 13 deletions(-)
 rename ipaacalib/java/test/src/ipaaca/{ => util}/ComponentNotifierIntegrationTest.java (93%)
 rename ipaacalib/java/test/src/ipaaca/{ => util}/ComponentNotifierTest.java (69%)

diff --git a/ipaacalib/java/src/ipaaca/Initializer.java b/ipaacalib/java/src/ipaaca/Initializer.java
index 4aa0d45..ed12400 100644
--- a/ipaacalib/java/src/ipaaca/Initializer.java
+++ b/ipaacalib/java/src/ipaaca/Initializer.java
@@ -15,9 +15,11 @@ public final class Initializer
     private Initializer()
     {
     }
-
-    public static void initializeIpaacaRsb()
+    private static volatile boolean initialized = false;
+    
+    public synchronized static void initializeIpaacaRsb()
     {
+        if(initialized)return;
         DefaultConverterRepository.getDefaultConverterRepository().addConverter(new IntConverter());
         DefaultConverterRepository.getDefaultConverterRepository().addConverter(
                 new ProtocolBufferConverter<IUCommission>(IUCommission.getDefaultInstance()));
@@ -33,6 +35,6 @@ public final class Initializer
          
         DefaultConverterRepository.getDefaultConverterRepository().addConverter(new PayloadConverter());
         DefaultConverterRepository.getDefaultConverterRepository().addConverter(new LinkUpdateConverter());
-
+        initialized = true;
     }
 }
diff --git a/ipaacalib/java/src/ipaaca/LocalIU.java b/ipaacalib/java/src/ipaaca/LocalIU.java
index 056a52b..3581d54 100644
--- a/ipaacalib/java/src/ipaaca/LocalIU.java
+++ b/ipaacalib/java/src/ipaaca/LocalIU.java
@@ -239,4 +239,10 @@ public class LocalIU extends AbstractIU
             getOutputBuffer().sendIUPayloadUpdate(this, update);
         }
     }
+    
+    @Override
+    public String toString()
+    {
+        return "LocalIU with category: "+this.getCategory() + "\nowner: "+getOwnerName()+"\npayload: "+this.getPayload();
+    }
 }
diff --git a/ipaacalib/java/src/ipaaca/Payload.java b/ipaacalib/java/src/ipaaca/Payload.java
index c2e3a93..eeaa8e7 100644
--- a/ipaacalib/java/src/ipaaca/Payload.java
+++ b/ipaacalib/java/src/ipaaca/Payload.java
@@ -235,4 +235,10 @@ public class Payload implements Map<String, String>
     {
         return map.values();
     }
+    
+    @Override
+    public String toString()
+    {
+        return map.toString();
+    }
 }
diff --git a/ipaacalib/java/src/ipaaca/util/ComponentNotifier.java b/ipaacalib/java/src/ipaaca/util/ComponentNotifier.java
index 3541c53..a414bce 100644
--- a/ipaacalib/java/src/ipaaca/util/ComponentNotifier.java
+++ b/ipaacalib/java/src/ipaaca/util/ComponentNotifier.java
@@ -10,10 +10,14 @@ import ipaaca.OutputBuffer;
 
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
 /**
@@ -29,6 +33,7 @@ public class ComponentNotifier
     public static final String RECEIVE_CATEGORIES = "recv_categories";
     public static final String STATE = "state";
     public static final String NAME = "name";
+    public static final String FUNCTION = "function";
     private final OutputBuffer outBuffer;
     private final String componentName;
     private final String componentFunction;
@@ -36,13 +41,17 @@ public class ComponentNotifier
     private final ImmutableSet<String> receiveCategories;
     private final InputBuffer inBuffer;
     private List<HandlerFunctor> handlers = new ArrayList<HandlerFunctor>();
-
+    private volatile boolean isInitialized = false;
+    private final BlockingQueue<String> receiverQueue = new LinkedBlockingQueue<>();
+    
     private class ComponentNotifyHandler implements HandlerFunctor
     {
         @Override
         public void handle(AbstractIU iu, IUEventType type, boolean local)
         {
             if(iu.getPayload().get(NAME).equals(componentName))return; //don't re-notify self
+            String receivers[] = iu.getPayload().get(RECEIVE_CATEGORIES).split("\\s*,\\s*");
+            receiverQueue.addAll(ImmutableSet.copyOf(receivers));
             for (HandlerFunctor h : handlers)
             {
                 h.handle(iu, type, local);
@@ -61,6 +70,34 @@ public class ComponentNotifier
     {
         handlers.add(h);
     }
+    
+    /**
+     * Wait until the receivers are registered for categories
+     */
+    public void waitForReceivers(ImmutableSet<String> categories)
+    {
+        Set<String> unhandledCategories = new HashSet<>(categories);        
+        while(!unhandledCategories.isEmpty())
+        {
+            try
+            {
+                unhandledCategories.remove(receiverQueue.take());
+                
+            }
+            catch (InterruptedException e)
+            {
+                Thread.interrupted();
+            }            
+        }
+    }
+    
+    /**
+     * Wait until receivers are registered for all categories sent by the component
+     */
+    public void waitForReceivers()
+    {
+        waitForReceivers(sendCategories);
+    }
 
     private void submitNotify(boolean isNew)
     {
@@ -87,8 +124,12 @@ public class ComponentNotifier
 
     public void initialize()
     {
-        inBuffer.registerHandler(new IUEventHandler(new ComponentNotifyHandler(), EnumSet.of(IUEventType.ADDED), ImmutableSet
-                .of(NOTIFY_CATEGORY)));
-        submitNotify(true);
+        if(!isInitialized)
+        {
+            inBuffer.registerHandler(new IUEventHandler(new ComponentNotifyHandler(), EnumSet.of(IUEventType.ADDED), ImmutableSet
+                    .of(NOTIFY_CATEGORY)));
+            submitNotify(true);
+            isInitialized = true;
+        }
     }
 }
diff --git a/ipaacalib/java/test/src/ipaaca/ComponentNotifierIntegrationTest.java b/ipaacalib/java/test/src/ipaaca/util/ComponentNotifierIntegrationTest.java
similarity index 93%
rename from ipaacalib/java/test/src/ipaaca/ComponentNotifierIntegrationTest.java
rename to ipaacalib/java/test/src/ipaaca/util/ComponentNotifierIntegrationTest.java
index 6ee8218..334f2ec 100644
--- a/ipaacalib/java/test/src/ipaaca/ComponentNotifierIntegrationTest.java
+++ b/ipaacalib/java/test/src/ipaaca/util/ComponentNotifierIntegrationTest.java
@@ -1,6 +1,12 @@
-package ipaaca;
+package ipaaca.util;
 
 import static org.junit.Assert.assertEquals;
+import ipaaca.AbstractIU;
+import ipaaca.HandlerFunctor;
+import ipaaca.IUEventType;
+import ipaaca.Initializer;
+import ipaaca.InputBuffer;
+import ipaaca.OutputBuffer;
 import ipaaca.util.ComponentNotifier;
 
 import java.util.Set;
diff --git a/ipaacalib/java/test/src/ipaaca/ComponentNotifierTest.java b/ipaacalib/java/test/src/ipaaca/util/ComponentNotifierTest.java
similarity index 69%
rename from ipaacalib/java/test/src/ipaaca/ComponentNotifierTest.java
rename to ipaacalib/java/test/src/ipaaca/util/ComponentNotifierTest.java
index d5f7434..6d06acf 100644
--- a/ipaacalib/java/test/src/ipaaca/ComponentNotifierTest.java
+++ b/ipaacalib/java/test/src/ipaaca/util/ComponentNotifierTest.java
@@ -1,14 +1,22 @@
-package ipaaca;
+package ipaaca.util;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import ipaaca.util.ComponentNotifier;
+import ipaaca.AbstractIU;
+import ipaaca.IUEventHandler;
+import ipaaca.IUEventType;
+import ipaaca.InputBuffer;
+import ipaaca.LocalIU;
+import ipaaca.OutputBuffer;
+import ipaaca.Payload;
 
 import java.util.Set;
 
@@ -91,12 +99,63 @@ public class ComponentNotifierTest
         assertEquals("componentNotify", iu.getCategory());
         assertEquals("old", iu.getPayload().get("state"));
     }
-    
+
     @Test
     public void testNoNotifyAtNotifyOld() throws Exception
     {
-        sendNotify("old", ImmutableSet.of("testsnd1"));        
+        sendNotify("old", ImmutableSet.of("testsnd1"));
         ArgumentCaptor<LocalIU> argument = ArgumentCaptor.forClass(LocalIU.class);
-        verify(mockOutBuffer,times(1)).add(argument.capture());        
+        verify(mockOutBuffer, times(1)).add(argument.capture());
+    }
+
+    private class WaitForFinish extends Thread
+    {
+        public volatile boolean waitFinish = false;
+
+        public void run()
+        {
+            notifier.waitForReceivers();
+            waitFinish = true;
+        }
+
+    }
+
+    @Test
+    public void testWait() throws InterruptedException
+    {
+        WaitForFinish wff = new WaitForFinish();
+        wff.start();
+        Thread.sleep(200);
+        assertFalse(wff.waitFinish);        
+        sendNotify("new",SEND_CAT);
+        Thread.sleep(200);
+        assertTrue(wff.waitFinish);
+    }
+    
+    @Test
+    public void testWaitWrongCats() throws InterruptedException
+    {
+        WaitForFinish wff = new WaitForFinish();
+        wff.start();
+        Thread.sleep(200);
+        assertFalse(wff.waitFinish);        
+        sendNotify("new",RECV_CAT);
+        Thread.sleep(200);
+        assertFalse(wff.waitFinish);
+    }
+    
+    @Test
+    public void testWaitIncremental()throws InterruptedException
+    {
+        WaitForFinish wff = new WaitForFinish();
+        wff.start();
+        Thread.sleep(200);
+        assertFalse(wff.waitFinish);        
+        sendNotify("new",ImmutableSet.of("testsnd1", "testsnd2"));
+        Thread.sleep(200);
+        assertFalse(wff.waitFinish);
+        sendNotify("new",ImmutableSet.of("testsnd3"));
+        Thread.sleep(200);
+        assertTrue(wff.waitFinish);        
     }
 }
-- 
GitLab