From 40aa2897b86ccabdfc2665398ac89f9e45390c92 Mon Sep 17 00:00:00 2001 From: Herwin van Welbergen <hvanwelbergen@TechFak.Uni-Bielefeld.DE> Date: Thu, 21 Nov 2013 11:04:09 +0100 Subject: [PATCH] made rsb 0.9.4 compliant --- ipaacalib/java/src/ipaaca/InputBuffer.java | 63 ++++++++++++++++-- ipaacalib/java/src/ipaaca/OutputBuffer.java | 50 ++++++++++++-- ipaacalib/java/src/ipaaca/RemotePushIU.java | 73 +++++++++++++++++---- 3 files changed, 161 insertions(+), 25 deletions(-) diff --git a/ipaacalib/java/src/ipaaca/InputBuffer.java b/ipaacalib/java/src/ipaaca/InputBuffer.java index 77db1c3..5647b90 100644 --- a/ipaacalib/java/src/ipaaca/InputBuffer.java +++ b/ipaacalib/java/src/ipaaca/InputBuffer.java @@ -10,6 +10,8 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import lombok.extern.slf4j.Slf4j; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,6 +20,7 @@ import rsb.Factory; import rsb.Handler; import rsb.InitializeException; import rsb.Listener; +import rsb.RSBException; import rsb.Scope; import rsb.patterns.RemoteServer; @@ -25,6 +28,7 @@ import rsb.patterns.RemoteServer; * An InputBuffer that holds remote IUs. * @author hvanwelbergen */ +@Slf4j public class InputBuffer extends Buffer { private Map<String, RemoteServer> remoteServerStore = new HashMap<String, RemoteServer>(); @@ -38,11 +42,33 @@ public class InputBuffer extends Buffer { for (Listener listener : listenerStore.values()) { - listener.deactivate(); + try + { + listener.deactivate(); + } + catch (RSBException e) + { + log.warn("RSB Exception on deactive {}", e, listener.toString()); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } } for (RemoteServer remServer : remoteServerStore.values()) { - remServer.deactivate(); + try + { + remServer.deactivate(); + } + catch (RSBException e) + { + log.warn("RSB Exception on RemoteServer deactivate {}", e, remServer.toString()); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } } } @@ -96,6 +122,10 @@ public class InputBuffer extends Buffer { throw new RuntimeException(e); } + catch (RSBException e) + { + throw new RuntimeException(e); + } remoteServerStore.put(iu.getOwnerName(), remoteServer); return remoteServer; } @@ -115,10 +145,25 @@ public class InputBuffer extends Buffer { return listenerStore.get(category); } - Listener listener = Factory.getInstance().createListener(new Scope("/ipaaca/category/" + category)); + Listener listener; + try + { + listener = Factory.getInstance().createListener(new Scope("/ipaaca/category/" + category)); + } + catch (InitializeException e1) + { + throw new RuntimeException(e1); + } listenerStore.put(category, listener); categoryInterests.add(category); - listener.addHandler(new InputHandler(), true); + try + { + listener.addHandler(new InputHandler(), true); + } + catch (InterruptedException e1) + { + Thread.currentThread().interrupt(); + } logger.info("Added category listener for {}", category); try { @@ -128,6 +173,10 @@ public class InputBuffer extends Buffer { throw new RuntimeException(e); } + catch (RSBException e) + { + throw new RuntimeException(e); + } return listener; } @@ -185,11 +234,11 @@ public class InputBuffer extends Buffer */ private void handleIUEvents(Event event) { - if(event.getData() instanceof RemoteMessageIU) + if (event.getData() instanceof RemoteMessageIU) { RemoteMessageIU rm = (RemoteMessageIU) event.getData(); messageStore.put(rm.getUid(), rm); - callIuEventHandlers(rm.getUid(),false, IUEventType.ADDED, rm.getCategory()); + callIuEventHandlers(rm.getUid(), false, IUEventType.ADDED, rm.getCategory()); messageStore.remove(rm.getUid()); } else if (event.getData() instanceof RemotePushIU) @@ -277,7 +326,7 @@ public class InputBuffer extends Buffer @Override public AbstractIU getIU(String iuid) { - if(iuStore.get(iuid)!=null) + if (iuStore.get(iuid) != null) { return iuStore.get(iuid); } diff --git a/ipaacalib/java/src/ipaaca/OutputBuffer.java b/ipaacalib/java/src/ipaaca/OutputBuffer.java index 5b77e72..3bf6577 100644 --- a/ipaacalib/java/src/ipaaca/OutputBuffer.java +++ b/ipaacalib/java/src/ipaaca/OutputBuffer.java @@ -10,6 +10,8 @@ import ipaaca.protobuf.Ipaaca.PayloadItem; import java.util.HashMap; import java.util.Map; +import lombok.extern.slf4j.Slf4j; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,6 +29,7 @@ import com.google.common.collect.SetMultimap; * An OutputBuffer that holds local IUs. * @author hvanwelbergen */ +@Slf4j public class OutputBuffer extends Buffer { @@ -72,10 +75,14 @@ public class OutputBuffer extends Buffer { throw new RuntimeException(e); } + catch (RSBException e) + { + throw new RuntimeException(e); + } } - private final class RemoteUpdatePayload implements DataCallback<Integer, IUPayloadUpdate> + private final class RemoteUpdatePayload extends DataCallback<Integer, IUPayloadUpdate> { @Override public Integer invoke(IUPayloadUpdate data) throws Throwable @@ -86,7 +93,7 @@ public class OutputBuffer extends Buffer } - private final class RemoteUpdateLinks implements DataCallback<Integer, IULinkUpdate> + private final class RemoteUpdateLinks extends DataCallback<Integer, IULinkUpdate> { @Override public Integer invoke(IULinkUpdate data) throws Throwable @@ -97,7 +104,7 @@ public class OutputBuffer extends Buffer } - private final class RemoteCommit implements DataCallback<Integer, IUCommission> + private final class RemoteCommit extends DataCallback<Integer, IUCommission> { @Override public Integer invoke(IUCommission data) throws Throwable @@ -287,12 +294,19 @@ public class OutputBuffer extends Buffer { return informerStore.get(category); } - Informer<Object> informer = Factory.getInstance().createInformer("/ipaaca/category/" + category); + Informer<Object> informer; + try + { + informer = Factory.getInstance().createInformer("/ipaaca/category/" + category); + } + catch (InitializeException e1) + { + throw new RuntimeException(e1); + } informerStore.put(category, informer); logger.info("Added informer on " + category); - // XXX new in java version, apperently informers need activation and deactivation try { informer.activate(); @@ -446,10 +460,32 @@ public class OutputBuffer extends Buffer public void close() { - server.deactivate(); + try + { + server.deactivate(); + } + catch (RSBException e) + { + log.warn("RSBException on deactivating server in close", e); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } for (Informer<?> informer : informerStore.values()) { - informer.deactivate(); + try + { + informer.deactivate(); + } + catch (RSBException e) + { + log.warn("RSBException on deactivating informer {} in close", e, informer.toString()); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } } } } diff --git a/ipaacalib/java/src/ipaaca/RemotePushIU.java b/ipaacalib/java/src/ipaaca/RemotePushIU.java index 2c82b5b..190e0a9 100644 --- a/ipaacalib/java/src/ipaaca/RemotePushIU.java +++ b/ipaacalib/java/src/ipaaca/RemotePushIU.java @@ -14,6 +14,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +90,14 @@ public class RemotePushIU extends AbstractIU { throw new RuntimeException(e); } + catch (ExecutionException e) + { + throw new RuntimeException(e); + } + catch (TimeoutException e) + { + throw new RuntimeException(e); + } if (newRevision == 0) { throw new IUUpdateFailedException(this); @@ -106,17 +116,17 @@ public class RemotePushIU extends AbstractIU { throw new IUReadOnlyException(this); } - Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true) + Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true) .setWriterName(getBuffer().getUniqueName()); - for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet()) - { - PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("") // TODO: fix this, default in .proto? + for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet()) + { + PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("") // TODO: fix this, default in .proto? .build(); - builder.addNewItems(newItem); - - } - IUPayloadUpdate update = builder.build(); - + builder.addNewItems(newItem); + + } + IUPayloadUpdate update = builder.build(); + RemoteServer server = getInputBuffer().getRemoteServer(this); logger.debug("Remote server has methods {}", server.getMethods()); int newRevision; @@ -128,12 +138,21 @@ public class RemotePushIU extends AbstractIU { throw new RuntimeException(e); } + catch (ExecutionException e) + { + throw new RuntimeException(e); + } + catch (TimeoutException e) + { + throw new RuntimeException(e); + } if (newRevision == 0) { throw new IUUpdateFailedException(this); } - System.err.print("************************ "); System.err.println(newRevision); - setRevision(newRevision); + System.err.print("************************ "); + System.err.println(newRevision); + setRevision(newRevision); } // def commit(self): @@ -180,6 +199,14 @@ public class RemotePushIU extends AbstractIU { throw new RuntimeException(e); } + catch (ExecutionException e) + { + throw new RuntimeException(e); + } + catch (TimeoutException e) + { + throw new RuntimeException(e); + } if (newRevision == 0) { throw new IUCommittedException(this); @@ -276,6 +303,14 @@ public class RemotePushIU extends AbstractIU { throw new RuntimeException(e); } + catch (ExecutionException e) + { + throw new RuntimeException(e); + } + catch (TimeoutException e) + { + throw new RuntimeException(e); + } if (newRevision == 0) { throw new IUUpdateFailedException(this); @@ -384,6 +419,14 @@ public class RemotePushIU extends AbstractIU { throw new RuntimeException(e); } + catch (ExecutionException e) + { + throw new RuntimeException(e); + } + catch (TimeoutException e) + { + throw new RuntimeException(e); + } if (newRevision == 0) { throw new IUUpdateFailedException(this); @@ -444,6 +487,14 @@ public class RemotePushIU extends AbstractIU { throw new RuntimeException(e); } + catch (ExecutionException e) + { + throw new RuntimeException(e); + } + catch (TimeoutException e) + { + throw new RuntimeException(e); + } if (newRevision == 0) { throw new IUUpdateFailedException(this); -- GitLab