Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • scs/ipaaca
  • ramin.yaghoubzadeh/ipaaca
2 results
Show changes
Showing
with 659 additions and 85 deletions
......@@ -6,7 +6,7 @@
<dependencies>
<dependency org="slf4j" name="slf4j-api" rev="latest.release" />
<dependency org="google" name="guava" rev="latest.release" />
<dependency org="google" name="protobuf-java" rev="latest.release" />
<dependency org="google" name="protobuf-java" rev="2.6.1" />
<dependency org="rsb" name="rsb" rev="latest.release" />
<dependency org="lombok" name="lombok" rev="latest.release" />
<dependency org="apache" name="commons-lang" rev="latest.release" />
......
[transport.spread]
host = localhost # default type is string
port = 4803 # types can be specified in angle brackets
enabled = true
......@@ -32,6 +32,7 @@
package ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.PayloadItem;
import java.util.ArrayList;
......@@ -59,9 +60,11 @@ public abstract class AbstractIU
protected boolean committed = false;
protected boolean retracted = false;
private String uid;
protected int revision;
protected long revision;
private boolean readOnly = false;
public abstract IU.AccessMode getAccessMode();
protected SetMultimap<String, String> links = HashMultimap.create();
private final SetMultimap<String, String> EMPTYLINKS = HashMultimap.create();
......@@ -147,7 +150,7 @@ public abstract class AbstractIU
this.readOnly = readOnly;
}
public void setRevision(int revision)
public void setRevision(long revision)
{
this.revision = revision;
}
......@@ -187,7 +190,7 @@ public abstract class AbstractIU
return buffer;
}
public int getRevision()
public long getRevision()
{
return revision;
}
......
......@@ -110,6 +110,11 @@ public abstract class Buffer
{
eventHandlers.add(handler);
}
public void removeHandler(IUEventHandler handler)
{
eventHandlers.remove(handler);
}
public void registerHandler(HandlerFunctor func) {
IUEventHandler handler;
......
......@@ -88,12 +88,8 @@ public class IUConverter implements Converter<ByteBuffer>
links.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build());
}
IU.AccessMode accessMode = IU.AccessMode.PUSH;
if(iua instanceof RemoteMessageIU || iua instanceof LocalMessageIU)
{
accessMode = IU.AccessMode.MESSAGE;
}
IU iu = IU.newBuilder().setUid(iua.getUid()).setRevision(iua.getRevision()).setCategory(iua.getCategory())
IU.AccessMode accessMode = iua.getAccessMode();
IU iu = IU.newBuilder().setUid(iua.getUid()).setRevision((int) iua.getRevision()).setCategory(iua.getCategory())
.setOwnerName(iua.getOwnerName()).setCommitted(iua.isCommitted()).setAccessMode(accessMode)
.setReadOnly(iua.isReadOnly()).setPayloadType("STR").addAllPayload(payloadItems).addAllLinks(links).build();
String wireFormat = (accessMode == IU.AccessMode.MESSAGE) ? "ipaaca-messageiu" : "ipaaca-iu";
......
......@@ -483,10 +483,10 @@ public class InputBuffer extends Buffer
rServer = getRemoteServer(writerName);
if ((rServer != null)&&(uid != null)) {
IUResendRequest iurr = IUResendRequest.newBuilder().setUid(uid).setHiddenScopeName(hiddenScopeName).build();
int rRevision = 0;
long rRevision = 0;
try
{
rRevision = (Integer) rServer.call("resendRequest", iurr);
rRevision = (Long) rServer.call("resendRequest", iurr);
}
catch (RSBException e)
{
......@@ -500,6 +500,10 @@ public class InputBuffer extends Buffer
{
throw new RuntimeException(e);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (rRevision == 0)
{
//throw new IUResendFailedException(aiu); // TODO
......
......@@ -32,6 +32,7 @@
package ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate.Builder;
......@@ -50,6 +51,11 @@ import com.google.common.collect.SetMultimap;
public class LocalIU extends AbstractIU
{
public IU.AccessMode getAccessMode()
{
return IU.AccessMode.PUSH;
}
private OutputBuffer outputBuffer;
......@@ -205,7 +211,7 @@ public class LocalIU extends AbstractIU
removeSet.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build());
}
outputBuffer.sendIULinkUpdate(this,
IULinkUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setWriterName(wName).setIsDelta(isDelta)
IULinkUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setWriterName(wName).setIsDelta(isDelta)
.addAllNewLinks(addSet).addAllLinksToRemove(removeSet).build());
}
}
......@@ -260,7 +266,7 @@ public class LocalIU extends AbstractIU
// send update to remote holders
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR")
.build();
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true)
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(true)
.setWriterName(writer == null ? getOwnerName() : writer).addNewItems(newItem).build();
getOutputBuffer().sendIUPayloadUpdate(this, update);
}
......@@ -284,7 +290,7 @@ public class LocalIU extends AbstractIU
increaseRevisionNumber();
if (isPublished())
{
Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true)
Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(true)
.setWriterName(writer == null ? getOwnerName() : writer);
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{
......@@ -342,7 +348,7 @@ public class LocalIU extends AbstractIU
if (isPublished())
{
// send update to remote holders
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true)
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(true)
.setWriterName(writer == null ? getOwnerName() : writer).addKeysToRemove((String) key).build();
getOutputBuffer().sendIUPayloadUpdate(this, update);
}
......@@ -355,7 +361,7 @@ public class LocalIU extends AbstractIU
{
if (isPublished())
{
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(false)
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(false)
.setWriterName(writerName == null ? getOwnerName() : writerName).addAllNewItems(newPayload).build();
getOutputBuffer().sendIUPayloadUpdate(this, update);
}
......
......@@ -32,6 +32,7 @@
package ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
/**
* Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls.
......@@ -50,4 +51,9 @@ public class LocalMessageIU extends LocalIU
super(category);
}
public IU.AccessMode getAccessMode()
{
return IU.AccessMode.MESSAGE;
}
}
......@@ -55,7 +55,9 @@ import rsb.Informer;
import rsb.InitializeException;
import rsb.RSBException;
import rsb.patterns.DataCallback;
import rsb.patterns.EventCallback;
import rsb.patterns.LocalServer;
import rsb.Event;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
......@@ -131,48 +133,58 @@ public class OutputBuffer extends Buffer
this.channel = ipaaca_channel;
}
private final class RemoteUpdatePayload extends DataCallback<Integer, IUPayloadUpdate>
private final class RemoteUpdatePayload extends EventCallback //DataCallback<Long, IUPayloadUpdate>
{
@Override
public Integer invoke(IUPayloadUpdate data) throws Throwable
public Event invoke(final Event request) //throws Throwable
{
logger.debug("remoteUpdate");
return remoteUpdatePayload(data);
long result = remoteUpdatePayload((IUPayloadUpdate) request.getData());
//System.out.println("remoteUpdatePayload yielded revision "+result);
return new Event(Long.class, new Long(result));
}
}
/*private final class RemoteUpdatePayload extends DataCallback<Long, IUPayloadUpdate>
{
@Override
public Long invoke(IUPayloadUpdate data) throws Throwable
{
logger.debug("remoteUpdate");
return remoteUpdatePayload(data);
}
private final class RemoteUpdateLinks extends DataCallback<Integer, IULinkUpdate>
}*/
private final class RemoteUpdateLinks extends EventCallback // DataCallback<Long, IULinkUpdate>
{
@Override
public Integer invoke(IULinkUpdate data) throws Throwable
public Event invoke(final Event request) //throws Throwable
{
logger.debug("remoteUpdateLinks");
return remoteUpdateLinks(data);
return new Event(Long.class, new Long(remoteUpdateLinks((IULinkUpdate) request.getData())));
}
}
private final class RemoteCommit extends DataCallback<Integer, IUCommission>
private final class RemoteCommit extends EventCallback //DataCallback<Long, IUCommission>
{
@Override
public Integer invoke(IUCommission data) throws Throwable
public Event invoke(final Event request) //throws Throwable
{
logger.debug("remoteCommit");
return remoteCommit(data);
return new Event(Long.class, new Long(remoteCommit((IUCommission) request.getData())));
}
}
private final class RemoteResendRequest extends DataCallback<Integer, IUResendRequest>
private final class RemoteResendRequest extends EventCallback //DataCallback<Long, IUResendRequest>
{
@Override
public Integer invoke(IUResendRequest data) throws Throwable
public Event invoke(final Event request) //throws Throwable
{
logger.debug("remoteResendRequest");
return remoteResendRequest(data);
return new Event(Long.class, new Long(remoteResendRequest((IUResendRequest) request.getData())));
}
}
// def _remote_update_payload(self, update):
......@@ -199,7 +211,7 @@ public class OutputBuffer extends Buffer
* Apply a remotely requested update to one of the stored IUs.
* @return 0 if not updated, IU version number otherwise
*/
int remoteUpdatePayload(IUPayloadUpdate update)
long remoteUpdatePayload(IUPayloadUpdate update)
{
if (!iuStore.containsKey(update.getUid()))
{
......@@ -244,7 +256,7 @@ public class OutputBuffer extends Buffer
* Apply a remotely requested update to one of the stored IUs.
* @return 0 if not updated, IU version number otherwise
*/
int remoteUpdateLinks(IULinkUpdate update)
long remoteUpdateLinks(IULinkUpdate update)
{
if (!iuStore.containsKey(update.getUid()))
{
......@@ -307,7 +319,7 @@ public class OutputBuffer extends Buffer
/**
* Apply a remotely requested commit to one of the stored IUs.
*/
private int remoteCommit(IUCommission iuc)
private long remoteCommit(IUCommission iuc)
{
if (!iuStore.containsKey(iuc.getUid()))
{
......@@ -336,7 +348,7 @@ public class OutputBuffer extends Buffer
/*
* Resend an requested iu over the specific hidden channel. (dlw) TODO
*/
private int remoteResendRequest(IUResendRequest iu_resend_request_pack)
private long remoteResendRequest(IUResendRequest iu_resend_request_pack)
{
if (!iuStore.containsKey(iu_resend_request_pack.getUid()))
{
......@@ -349,7 +361,7 @@ public class OutputBuffer extends Buffer
Informer<Object> informer = getInformer(iu_resend_request_pack.getHiddenScopeName());
try
{
informer.send(iu);
informer.publish(iu);
}
catch (RSBException e)
{
......@@ -404,6 +416,10 @@ public class OutputBuffer extends Buffer
{
throw new RuntimeException(e);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
return informer;
}
......@@ -441,7 +457,7 @@ public class OutputBuffer extends Buffer
Informer<Object> informer = getInformer(iu.getCategory());
try
{
informer.send(iu);
informer.publish(iu);
}
catch (RSBException e)
{
......@@ -475,12 +491,12 @@ public class OutputBuffer extends Buffer
*/
protected void sendIUCommission(AbstractIU iu, String writerName)
{
IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(iu.getUid()).setRevision(iu.getRevision())
IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(iu.getUid()).setRevision((int) iu.getRevision())
.setWriterName(writerName == null ? iu.getOwnerName() : writerName).build();
Informer<Object> informer = getInformer(iu.getCategory());
try
{
informer.send(iuc);
informer.publish(iuc);
}
catch (RSBException e)
{
......@@ -490,11 +506,11 @@ public class OutputBuffer extends Buffer
protected void sendIURetraction(AbstractIU iu)
{
IURetraction iuc = Ipaaca.IURetraction.newBuilder().setUid(iu.getUid()).setRevision(iu.getRevision()).build();
IURetraction iuc = Ipaaca.IURetraction.newBuilder().setUid(iu.getUid()).setRevision((int) iu.getRevision()).build();
Informer<Object> informer = getInformer(iu.getCategory());
try
{
informer.send(iuc);
informer.publish(iuc);
}
catch (RSBException e)
{
......@@ -534,7 +550,7 @@ public class OutputBuffer extends Buffer
Informer<Object> informer = getInformer(iu.getCategory());
try
{
informer.send(update);
informer.publish(update);
}
catch (RSBException e)
{
......@@ -547,7 +563,7 @@ public class OutputBuffer extends Buffer
Informer<Object> informer = getInformer(iu.getCategory());
try
{
informer.send(update);
informer.publish(update);
}
catch (RSBException e)
{
......
......@@ -36,7 +36,10 @@ import ipaaca.protobuf.Ipaaca.PayloadItem;
import org.apache.commons.lang.StringEscapeUtils;
import com.google.common.collect.ImmutableSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -49,7 +52,7 @@ import java.util.Set;
*/
public class Payload implements Map<String, String>
{
private Map<String, String> map = new HashMap<String, String>();
private Map<String, String> map = Collections.synchronizedMap(new HashMap<String, String>());
private final AbstractIU iu;
public Payload(AbstractIU iu)
......@@ -82,17 +85,23 @@ public class Payload implements Map<String, String>
public void set(Map<String, String> newPayload, String writerName)
{
iu.setPayload(newPayload, writerName);
map.clear();
map.putAll(newPayload);
synchronized(map)
{
map.clear();
map.putAll(newPayload);
}
}
public void set(List<PayloadItem> newPayload, String writerName)
{
iu.handlePayloadSetting(newPayload, writerName);
map.clear();
for (PayloadItem item : newPayload)
synchronized(map)
{
map.put(item.getKey(), pseudoConvertFromJSON(item.getValue(), item.getType()));
map.clear();
for (PayloadItem item : newPayload)
{
map.put(item.getKey(), pseudoConvertFromJSON(item.getValue(), item.getType()));
}
}
}
......@@ -136,9 +145,12 @@ public class Payload implements Map<String, String>
return map.containsValue(value);
}
public Set<java.util.Map.Entry<String, String>> entrySet()
/**
* Provides an immutable copy of the entryset of the Payload
*/
public ImmutableSet<java.util.Map.Entry<String, String>> entrySet()
{
return map.entrySet();
return ImmutableSet.copyOf(map.entrySet());
}
public boolean equals(Object o)
......
......@@ -32,6 +32,7 @@
package ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.PayloadItem;
import java.util.List;
......@@ -45,6 +46,11 @@ import com.google.common.collect.SetMultimap;
public class RemoteMessageIU extends AbstractIU
{
public IU.AccessMode getAccessMode()
{
return IU.AccessMode.MESSAGE;
}
public RemoteMessageIU(String uid)
{
super(uid);
......
......@@ -33,6 +33,7 @@
package ipaaca;
import ipaaca.protobuf.Ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.IUCommission;
import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
......@@ -66,6 +67,11 @@ public class RemotePushIU extends AbstractIU
private final static Logger logger = LoggerFactory.getLogger(RemotePushIU.class.getName());
private InputBuffer inputBuffer;
public IU.AccessMode getAccessMode()
{
return IU.AccessMode.PUSH;
}
public InputBuffer getInputBuffer()
{
return inputBuffer;
......@@ -104,15 +110,17 @@ public class RemotePushIU extends AbstractIU
throw new IUReadOnlyException(this);
}
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR").build();
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision(getRevision())
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision((int) getRevision())
.setWriterName(getBuffer().getUniqueName()).addNewItems(newItem).build();
RemoteServer server = getInputBuffer().getRemoteServer(this);
logger.debug("Remote server has methods {}", server.getMethods());
int newRevision;
long newRevision;
try
{
newRevision = (Integer) server.call("updatePayload", update);
//System.out.println("calling remote updatePayload ...");
newRevision = (Long) server.call("updatePayload", update);
//System.out.println(" ... done");
}
catch (RSBException e)
{
......@@ -126,6 +134,10 @@ public class RemotePushIU extends AbstractIU
{
throw new RuntimeException(e);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUUpdateFailedException(this);
......@@ -148,7 +160,7 @@ public class RemotePushIU extends AbstractIU
{
throw new IUReadOnlyException(this);
}
Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true)
Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(true)
.setWriterName(getBuffer().getUniqueName());
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{
......@@ -161,10 +173,10 @@ public class RemotePushIU extends AbstractIU
RemoteServer server = getInputBuffer().getRemoteServer(this);
logger.debug("Remote server has methods {}", server.getMethods());
int newRevision;
long newRevision;
try
{
newRevision = (Integer) server.call("updatePayload", update);
newRevision = (Long) server.call("updatePayload", update);
}
catch (RSBException e)
{
......@@ -178,6 +190,10 @@ public class RemotePushIU extends AbstractIU
{
throw new RuntimeException(e);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUUpdateFailedException(this);
......@@ -210,13 +226,13 @@ public class RemotePushIU extends AbstractIU
}
else
{
IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(getUid()).setRevision(getRevision())
IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(getUid()).setRevision((int) getRevision())
.setWriterName(getBuffer().getUniqueName()).build();
RemoteServer server = inputBuffer.getRemoteServer(this);
int newRevision;
long newRevision;
try
{
newRevision = (Integer) server.call("commit", iuc);
newRevision = (Long) server.call("commit", iuc);
}
catch (RSBException e)
{
......@@ -230,6 +246,10 @@ public class RemotePushIU extends AbstractIU
{
throw new RuntimeException(e);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUCommittedException(this);
......@@ -281,13 +301,13 @@ public class RemotePushIU extends AbstractIU
throw new IUReadOnlyException(this);
}
IUPayloadUpdate iuu = IUPayloadUpdate.newBuilder().setRevision(getRevision()).setIsDelta(false).setUid(getUid())
IUPayloadUpdate iuu = IUPayloadUpdate.newBuilder().setRevision((int) getRevision()).setIsDelta(false).setUid(getUid())
.addAllNewItems(newItems).setWriterName(getBuffer() != null ? getBuffer().getUniqueName() : "").build();
RemoteServer server = inputBuffer.getRemoteServer(this);
int newRevision;
long newRevision;
try
{
newRevision = (Integer) server.call("updatePayload", iuu);
newRevision = (Long) server.call("updatePayload", iuu);
}
catch (RSBException e)
{
......@@ -301,6 +321,10 @@ public class RemotePushIU extends AbstractIU
{
throw new RuntimeException(e);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUUpdateFailedException(this);
......@@ -399,13 +423,13 @@ public class RemotePushIU extends AbstractIU
{
throw new IUReadOnlyException(this);
}
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision(getRevision())
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision((int) getRevision())
.setWriterName(getBuffer().getUniqueName()).addKeysToRemove((String) key).build();
RemoteServer server = getInputBuffer().getRemoteServer(this);
int newRevision;
long newRevision;
try
{
newRevision = (Integer) server.call("updatePayload", update);
newRevision = (Long) server.call("updatePayload", update);
}
catch (RSBException e)
{
......@@ -419,6 +443,10 @@ public class RemotePushIU extends AbstractIU
{
throw new RuntimeException(e);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUUpdateFailedException(this);
......@@ -455,11 +483,11 @@ public class RemotePushIU extends AbstractIU
}
IULinkUpdate update = IULinkUpdate.newBuilder().addAllLinksToRemove(removeLinkSet).addAllNewLinks(newLinkSet).setIsDelta(isDelta)
.setWriterName(getBuffer() != null ? getBuffer().getUniqueName() : "").setUid(getUid()).setRevision(getRevision()).build();
int newRevision;
.setWriterName(getBuffer() != null ? getBuffer().getUniqueName() : "").setUid(getUid()).setRevision((int) getRevision()).build();
long newRevision;
try
{
newRevision = (Integer) server.call("updateLinks", update);
newRevision = (Long) server.call("updateLinks", update);
}
catch (RSBException e)
{
......@@ -473,6 +501,10 @@ public class RemotePushIU extends AbstractIU
{
throw new RuntimeException(e);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUUpdateFailedException(this);
......
package ipaaca.util;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
import ipaaca.protobuf.Ipaaca.IU;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
......@@ -18,6 +10,13 @@ import java.util.Set;
import com.google.common.collect.ImmutableSet;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
/**
* Client to get/set key value pairs on a Blackboard
* @author hvanwelbergen
......
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2016 Social Cognitive Systems Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca.util;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
import java.util.HashMap;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
public class IpaacaLogger {
private static OutputBuffer ob;
private static final Object lock = new Object();
private static boolean SEND_IPAACA_LOGS = true;
private static String MODULE_NAME = "???";
private static void initializeOutBuffer() {
synchronized (lock) {
if (ob == null) {
ob = new OutputBuffer("LogSender");
}
}
}
public static void setModuleName(String name) {
synchronized (lock) {
MODULE_NAME = name;
}
}
public static void setLogFileName(String fileName, String logMode) {
initializeOutBuffer();
LocalMessageIU msg = new LocalMessageIU("log");
HashMap<String, String> pl = new HashMap<String, String>();
pl.put("cmd", "open_log_file");
pl.put("filename", fileName);
if (logMode != null) {
if (logMode.equals("append") ||
logMode.equals("overwrite") ||
logMode.equals("timestamp")) {
pl.put("existing", logMode);
} else {
return;
}
}
ob.add(msg);
}
public static void sendIpaacaLogs(boolean flag) {
synchronized (lock) {
SEND_IPAACA_LOGS = flag;
}
}
private static void logConsole(String level, String text, float now, String function, String thread) {
for(String line: text.split("\n")) {
System.out.println("[" + level + "] " + thread + " " + function + " " + line);
function = StringUtils.leftPad("", function.length(), ' ');
thread = StringUtils.leftPad("", thread.length(), ' ');
}
}
private static void logIpaaca(String level, String text, float now, String function, String thread) {
initializeOutBuffer();
LocalMessageIU msg = new LocalMessageIU("log");
HashMap<String, String> pl = new HashMap<String, String>();
pl.put("module", MODULE_NAME);
pl.put("function", function);
pl.put("level", level);
pl.put("time", String.format("%.3f", now));
pl.put("thread", thread);
pl.put("uuid", UUID.randomUUID().toString());
pl.put("text", text);
msg.setPayload(pl);
ob.add(msg);
}
private static String getCallerName() {
String function = Thread.currentThread().getStackTrace()[3].getClassName();
function += "." + Thread.currentThread().getStackTrace()[3].getMethodName();
return function;
}
public static void logError(String msg) {
logError(msg, System.currentTimeMillis(), getCallerName());
}
public static void logError(String msg, float now) {
logError(msg, now, getCallerName());
}
private static void logError(String msg, float now, String callerName) {
String thread = Thread.currentThread().getName();
if (SEND_IPAACA_LOGS) {
logIpaaca("ERROR", msg, now, callerName, thread);
}
logConsole("ERROR", msg, now, callerName, thread);
}
public static void logWarn(String msg) {
logWarn(msg, System.currentTimeMillis(), getCallerName());
}
public static void logWarn(String msg, float now) {
logWarn(msg, now, getCallerName());
}
private static void logWarn(String msg, float now, String callerName) {
String thread = Thread.currentThread().getName();
if (SEND_IPAACA_LOGS) {
logIpaaca("WARN", msg, now, callerName, thread);
}
logConsole("WARN", msg, now, callerName, thread);
}
public static void logInfo(String msg) {
logInfo(msg, System.currentTimeMillis(), getCallerName());
}
public static void logInfo(String msg, float now) {
logInfo(msg, now, getCallerName());
}
private static void logInfo(String msg, float now, String callerName) {
String thread = Thread.currentThread().getName();
if (SEND_IPAACA_LOGS) {
logIpaaca("INFO", msg, now, callerName, thread);
}
logConsole("INFO", msg, now, callerName, thread);
}
public static void logDebug(String msg) {
logDebug(msg, System.currentTimeMillis(), getCallerName());
}
public static void logDebug(String msg, float now) {
logDebug(msg, now, getCallerName());
}
private static void logDebug(String msg, float now, String callerName) {
String thread = Thread.currentThread().getName();
if (SEND_IPAACA_LOGS) {
logIpaaca("DEBUG", msg, now, callerName, thread);
}
logConsole("DEBUG", msg, now, callerName, thread);
}
}
package ipaaca.util.communication;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableSet;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventHandler;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
/**
* Obtain a IU in the future. Usage:<br>
* FutureIU fu = FutureIU("componentx", "status", "started"); //wait for componentx to send a message that is it fully started<br>
* [Start componentx, assumes that component x will send a message or other iu with status=started in the payload]<br>
* AbstractIU iu = fu.take(); //get the actual IU
* @author hvanwelbergen
*/
public class FutureIU
{
private final InputBuffer inBuffer;
private final BlockingQueue<AbstractIU> queue = new ArrayBlockingQueue<AbstractIU>(1);
private final IUEventHandler handler;
public FutureIU(String category, final String idKey, final String idVal, InputBuffer inBuffer)
{
this.inBuffer = inBuffer;
handler = new IUEventHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
String id = iu.getPayload().get(idKey);
if (idVal.equals(id))
{
queue.offer(iu);
}
}
}, ImmutableSet.of(category));
inBuffer.registerHandler(handler);
}
public FutureIU(String category, String idKey, String idVal)
{
this(category, idKey, idVal, new InputBuffer("FutureIU", ImmutableSet.of(category)));
}
/**
* Closes the FutureIU, use only if get is not used.
*/
public void cleanup()
{
inBuffer.removeHandler(handler);
if (inBuffer.getOwningComponentName().equals("FutureIU"))
{
inBuffer.close();
}
}
/**
* Waits (if necessary) for the IU and take it (can be done only once)
*/
public AbstractIU take() throws InterruptedException
{
AbstractIU iu;
try
{
iu = queue.take();
}
finally
{
cleanup();
}
return iu;
}
/**
* Wait for at most the given time for the IU and take it (can be done only once), return null on timeout
*/
public AbstractIU take(long timeout, TimeUnit unit) throws InterruptedException
{
AbstractIU iu;
try
{
iu = queue.poll(timeout, unit);
}
finally
{
cleanup();
}
return iu;
}
}
package ipaaca.util.communication;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableSet;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
/**
* Obtain multiple future ius on an specific category. Usage:<br>
* FutureIUs futures = new FutureIUs(componentx, key);<br>
* [make componentx send a IU with key=keyvaluedesired1]<br>
* AbstractIU iu = futures.take(keyvaluedesired1);<br>
* [make componentx send a IU with key=keyvaluedesired2]
* AbstractIU iu = futures.take(keyvaluedesired2);<br>
* ...<br>
* futures.close();
* @author hvanwelbergen
*/
public class FutureIUs
{
private final InputBuffer inBuffer;
private final Map<String,BlockingQueue<AbstractIU>> resultsMap = Collections.synchronizedMap(new HashMap<String,BlockingQueue<AbstractIU>>());
public FutureIUs(String category, final String idKey)
{
inBuffer = new InputBuffer("FutureIUs", ImmutableSet.of(category));
inBuffer.registerHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
String id = iu.getPayload().get(idKey);
resultsMap.putIfAbsent(id, new ArrayBlockingQueue<AbstractIU>(1));
resultsMap.get(id).offer(iu);
}
}, ImmutableSet.of(category));
}
/**
* Waits (if necessary) for the IU and take it (can be done only once)
*/
public AbstractIU take(String idValue) throws InterruptedException
{
resultsMap.putIfAbsent(idValue, new ArrayBlockingQueue<AbstractIU>(1));
return resultsMap.get(idValue).take();
}
/**
* Wait for at most the given time for the IU and take it (can be done only once), return null on timeout
*/
public AbstractIU take(String idValue, long timeout, TimeUnit unit) throws InterruptedException
{
resultsMap.putIfAbsent(idValue, new ArrayBlockingQueue<AbstractIU>(1));
return resultsMap.get(idValue).poll(timeout, unit);
}
public void close()
{
inBuffer.close();
}
}
......@@ -183,7 +183,7 @@ public class ComponentPushCommunicationIntegrationTest
payloadUpdate.put("chunk12", "item2");
payloadUpdate.put("chunk13", "item3");
payloadUpdate.put("chunk14", "item4");
int oldRev = iuIn.getRevision();
long oldRev = iuIn.getRevision();
localIU.getPayload().merge(payloadUpdate);
Thread.sleep(200);
assertEquals(oldRev + 1, iuIn.getRevision());
......@@ -197,7 +197,7 @@ public class ComponentPushCommunicationIntegrationTest
payloadUpdate2.put("chunk22", "item6");
payloadUpdate2.put("chunk13", "item3-changed");
payloadUpdate2.put("chunk14", "item4-changed");
int oldRev2 = iuIn.getRevision();
long oldRev2 = iuIn.getRevision();
iuIn.getPayload().merge(payloadUpdate2);
Thread.sleep(200);
assertEquals(oldRev2 + 1, localIU.getRevision());
......
......@@ -54,7 +54,7 @@ public class InputBufferTest
iu.setOwnerName("owner");
iu.setReadOnly(false);
iu.setRevision(1);
informer.send(iu);
informer.publish(iu);
Thread.sleep(1000);
AbstractIU iuIn = inBuffer.getIU("uid1");
......
package ipaaca.util.communication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
/**
* Unit tests for the FutureIU
* @author hvanwelbergen
*
*/
public class FutureIUTest
{
private final OutputBuffer outBuffer = new OutputBuffer("component1");
@Test(timeout = 2000)
public void testSendBeforeTake() throws InterruptedException
{
FutureIU fu = new FutureIU("cat1", "status", "started");
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("status", "started");
outBuffer.add(message);
assertEquals(message.getPayload(), fu.take().getPayload());
}
@Test(timeout = 2000)
public void testSendAfterTake() throws InterruptedException
{
FutureIU fu = new FutureIU("cat1", "status", "started");
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("status", "started");
Runnable send = () -> {
try
{
Thread.sleep(1000);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
outBuffer.add(message);
};
new Thread(send).start();
assertEquals(message.getPayload(), fu.take().getPayload());
}
@Test
public void testInvalidKeyValue() throws InterruptedException
{
FutureIU fu = new FutureIU("cat1", "status", "started");
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("status", "cancelled");
assertNull(fu.take(1,TimeUnit.SECONDS));
}
}
package ipaaca.util.communication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Test;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
/**
* Unit tests for FutureIUs
* @author hvanwelbergen
*
*/
public class FutureIUsTest
{
private FutureIUs fus = new FutureIUs("cat1","id");
private final OutputBuffer outBuffer = new OutputBuffer("component1");
@After
public void cleanup()
{
fus.close();
}
@Test(timeout = 2000)
public void testSendBeforeTake() throws InterruptedException
{
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("id", "id1");
outBuffer.add(message);
assertEquals(message.getPayload(), fus.take("id1").getPayload());
}
@Test(timeout = 2000)
public void testSendAfterTake() throws InterruptedException
{
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("id", "id1");
Runnable send = () -> {
try
{
Thread.sleep(1000);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
outBuffer.add(message);
};
new Thread(send).start();
assertEquals(message.getPayload(), fus.take("id1").getPayload());
}
@Test
public void testNonMatchingKeyValue() throws InterruptedException
{
LocalMessageIU message = new LocalMessageIU("cat1");
message.getPayload().put("id", "id2");
outBuffer.add(message);
assertNull(fus.take("id1", 1,TimeUnit.SECONDS));
}
@Test
public void testMultipleKeyValues() throws InterruptedException
{
LocalMessageIU message1 = new LocalMessageIU("cat1");
message1.getPayload().put("id", "id1");
LocalMessageIU message2 = new LocalMessageIU("cat1");
message2.getPayload().put("id", "id2");
outBuffer.add(message2);
outBuffer.add(message1);
assertEquals(message1.getPayload(), fus.take("id1").getPayload());
assertEquals(message2.getPayload(), fus.take("id2").getPayload());
}
}