Skip to content
Snippets Groups Projects
Commit 40aa2897 authored by Herwin van Welbergen's avatar Herwin van Welbergen
Browse files

made rsb 0.9.4 compliant

parent 2ee1ef77
No related branches found
No related tags found
No related merge requests found
......@@ -10,6 +10,8 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -18,6 +20,7 @@ import rsb.Factory;
import rsb.Handler;
import rsb.InitializeException;
import rsb.Listener;
import rsb.RSBException;
import rsb.Scope;
import rsb.patterns.RemoteServer;
......@@ -25,6 +28,7 @@ import rsb.patterns.RemoteServer;
* An InputBuffer that holds remote IUs.
* @author hvanwelbergen
*/
@Slf4j
public class InputBuffer extends Buffer
{
private Map<String, RemoteServer> remoteServerStore = new HashMap<String, RemoteServer>();
......@@ -38,11 +42,33 @@ public class InputBuffer extends Buffer
{
for (Listener listener : listenerStore.values())
{
listener.deactivate();
try
{
listener.deactivate();
}
catch (RSBException e)
{
log.warn("RSB Exception on deactive {}", e, listener.toString());
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
}
for (RemoteServer remServer : remoteServerStore.values())
{
remServer.deactivate();
try
{
remServer.deactivate();
}
catch (RSBException e)
{
log.warn("RSB Exception on RemoteServer deactivate {}", e, remServer.toString());
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
}
}
......@@ -96,6 +122,10 @@ public class InputBuffer extends Buffer
{
throw new RuntimeException(e);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
remoteServerStore.put(iu.getOwnerName(), remoteServer);
return remoteServer;
}
......@@ -115,10 +145,25 @@ public class InputBuffer extends Buffer
{
return listenerStore.get(category);
}
Listener listener = Factory.getInstance().createListener(new Scope("/ipaaca/category/" + category));
Listener listener;
try
{
listener = Factory.getInstance().createListener(new Scope("/ipaaca/category/" + category));
}
catch (InitializeException e1)
{
throw new RuntimeException(e1);
}
listenerStore.put(category, listener);
categoryInterests.add(category);
listener.addHandler(new InputHandler(), true);
try
{
listener.addHandler(new InputHandler(), true);
}
catch (InterruptedException e1)
{
Thread.currentThread().interrupt();
}
logger.info("Added category listener for {}", category);
try
{
......@@ -128,6 +173,10 @@ public class InputBuffer extends Buffer
{
throw new RuntimeException(e);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
return listener;
}
......@@ -185,11 +234,11 @@ public class InputBuffer extends Buffer
*/
private void handleIUEvents(Event event)
{
if(event.getData() instanceof RemoteMessageIU)
if (event.getData() instanceof RemoteMessageIU)
{
RemoteMessageIU rm = (RemoteMessageIU) event.getData();
messageStore.put(rm.getUid(), rm);
callIuEventHandlers(rm.getUid(),false, IUEventType.ADDED, rm.getCategory());
callIuEventHandlers(rm.getUid(), false, IUEventType.ADDED, rm.getCategory());
messageStore.remove(rm.getUid());
}
else if (event.getData() instanceof RemotePushIU)
......@@ -277,7 +326,7 @@ public class InputBuffer extends Buffer
@Override
public AbstractIU getIU(String iuid)
{
if(iuStore.get(iuid)!=null)
if (iuStore.get(iuid) != null)
{
return iuStore.get(iuid);
}
......
......@@ -10,6 +10,8 @@ import ipaaca.protobuf.Ipaaca.PayloadItem;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -27,6 +29,7 @@ import com.google.common.collect.SetMultimap;
* An OutputBuffer that holds local IUs.
* @author hvanwelbergen
*/
@Slf4j
public class OutputBuffer extends Buffer
{
......@@ -72,10 +75,14 @@ public class OutputBuffer extends Buffer
{
throw new RuntimeException(e);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
}
private final class RemoteUpdatePayload implements DataCallback<Integer, IUPayloadUpdate>
private final class RemoteUpdatePayload extends DataCallback<Integer, IUPayloadUpdate>
{
@Override
public Integer invoke(IUPayloadUpdate data) throws Throwable
......@@ -86,7 +93,7 @@ public class OutputBuffer extends Buffer
}
private final class RemoteUpdateLinks implements DataCallback<Integer, IULinkUpdate>
private final class RemoteUpdateLinks extends DataCallback<Integer, IULinkUpdate>
{
@Override
public Integer invoke(IULinkUpdate data) throws Throwable
......@@ -97,7 +104,7 @@ public class OutputBuffer extends Buffer
}
private final class RemoteCommit implements DataCallback<Integer, IUCommission>
private final class RemoteCommit extends DataCallback<Integer, IUCommission>
{
@Override
public Integer invoke(IUCommission data) throws Throwable
......@@ -287,12 +294,19 @@ public class OutputBuffer extends Buffer
{
return informerStore.get(category);
}
Informer<Object> informer = Factory.getInstance().createInformer("/ipaaca/category/" + category);
Informer<Object> informer;
try
{
informer = Factory.getInstance().createInformer("/ipaaca/category/" + category);
}
catch (InitializeException e1)
{
throw new RuntimeException(e1);
}
informerStore.put(category, informer);
logger.info("Added informer on " + category);
// XXX new in java version, apperently informers need activation and deactivation
try
{
informer.activate();
......@@ -446,10 +460,32 @@ public class OutputBuffer extends Buffer
public void close()
{
server.deactivate();
try
{
server.deactivate();
}
catch (RSBException e)
{
log.warn("RSBException on deactivating server in close", e);
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
for (Informer<?> informer : informerStore.values())
{
informer.deactivate();
try
{
informer.deactivate();
}
catch (RSBException e)
{
log.warn("RSBException on deactivating informer {} in close", e, informer.toString());
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
}
}
}
......@@ -14,6 +14,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -88,6 +90,14 @@ public class RemotePushIU extends AbstractIU
{
throw new RuntimeException(e);
}
catch (ExecutionException e)
{
throw new RuntimeException(e);
}
catch (TimeoutException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUUpdateFailedException(this);
......@@ -106,17 +116,17 @@ public class RemotePushIU extends AbstractIU
{
throw new IUReadOnlyException(this);
}
Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true)
Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true)
.setWriterName(getBuffer().getUniqueName());
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{
PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("") // TODO: fix this, default in .proto?
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{
PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("") // TODO: fix this, default in .proto?
.build();
builder.addNewItems(newItem);
}
IUPayloadUpdate update = builder.build();
builder.addNewItems(newItem);
}
IUPayloadUpdate update = builder.build();
RemoteServer server = getInputBuffer().getRemoteServer(this);
logger.debug("Remote server has methods {}", server.getMethods());
int newRevision;
......@@ -128,12 +138,21 @@ public class RemotePushIU extends AbstractIU
{
throw new RuntimeException(e);
}
catch (ExecutionException e)
{
throw new RuntimeException(e);
}
catch (TimeoutException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUUpdateFailedException(this);
}
System.err.print("************************ "); System.err.println(newRevision);
setRevision(newRevision);
System.err.print("************************ ");
System.err.println(newRevision);
setRevision(newRevision);
}
// def commit(self):
......@@ -180,6 +199,14 @@ public class RemotePushIU extends AbstractIU
{
throw new RuntimeException(e);
}
catch (ExecutionException e)
{
throw new RuntimeException(e);
}
catch (TimeoutException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUCommittedException(this);
......@@ -276,6 +303,14 @@ public class RemotePushIU extends AbstractIU
{
throw new RuntimeException(e);
}
catch (ExecutionException e)
{
throw new RuntimeException(e);
}
catch (TimeoutException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUUpdateFailedException(this);
......@@ -384,6 +419,14 @@ public class RemotePushIU extends AbstractIU
{
throw new RuntimeException(e);
}
catch (ExecutionException e)
{
throw new RuntimeException(e);
}
catch (TimeoutException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUUpdateFailedException(this);
......@@ -444,6 +487,14 @@ public class RemotePushIU extends AbstractIU
{
throw new RuntimeException(e);
}
catch (ExecutionException e)
{
throw new RuntimeException(e);
}
catch (TimeoutException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUUpdateFailedException(this);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment