Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • scs/ipaaca
  • ramin.yaghoubzadeh/ipaaca
2 results
Show changes
Showing
with 695 additions and 108 deletions
...@@ -6,5 +6,6 @@ run.jvmargs= -Xms128m -Xmx512m -Xss5M ...@@ -6,5 +6,6 @@ run.jvmargs= -Xms128m -Xmx512m -Xss5M
rebuild.list= rebuild.list=
publish.resolver=asap.sftp.publish publish.resolver=asap.sftp.publish
dist.dir=../../dist dist.dir=../../dist
javac.source=1.6 javac.source=1.8
javac.target=1.6 javac.target=1.8
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
<dependencies> <dependencies>
<dependency org="slf4j" name="slf4j-api" rev="latest.release" /> <dependency org="slf4j" name="slf4j-api" rev="latest.release" />
<dependency org="google" name="guava" rev="latest.release" /> <dependency org="google" name="guava" rev="latest.release" />
<dependency org="google" name="protobuf-java" rev="latest.release" /> <dependency org="google" name="protobuf-java" rev="2.6.1" />
<dependency org="rsb" name="rsb" rev="latest.release" /> <dependency org="rsb" name="rsb" rev="latest.release" />
<dependency org="lombok" name="lombok" rev="latest.release" /> <dependency org="lombok" name="lombok" rev="latest.release" />
<dependency org="apache" name="commons-lang" rev="latest.release" /> <dependency org="apache" name="commons-lang" rev="latest.release" />
......
[transport.spread]
host = localhost # default type is string
port = 4803 # types can be specified in angle brackets
enabled = true
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
package ipaaca; package ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.PayloadItem; import ipaaca.protobuf.Ipaaca.PayloadItem;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -57,10 +58,13 @@ public abstract class AbstractIU ...@@ -57,10 +58,13 @@ public abstract class AbstractIU
protected Payload payload; protected Payload payload;
protected String category; protected String category;
protected boolean committed = false; protected boolean committed = false;
protected boolean retracted = false;
private String uid; private String uid;
protected int revision; protected long revision;
private boolean readOnly = false; private boolean readOnly = false;
public abstract IU.AccessMode getAccessMode();
protected SetMultimap<String, String> links = HashMultimap.create(); protected SetMultimap<String, String> links = HashMultimap.create();
private final SetMultimap<String, String> EMPTYLINKS = HashMultimap.create(); private final SetMultimap<String, String> EMPTYLINKS = HashMultimap.create();
...@@ -146,7 +150,7 @@ public abstract class AbstractIU ...@@ -146,7 +150,7 @@ public abstract class AbstractIU
this.readOnly = readOnly; this.readOnly = readOnly;
} }
public void setRevision(int revision) public void setRevision(long revision)
{ {
this.revision = revision; this.revision = revision;
} }
...@@ -186,7 +190,7 @@ public abstract class AbstractIU ...@@ -186,7 +190,7 @@ public abstract class AbstractIU
return buffer; return buffer;
} }
public int getRevision() public long getRevision()
{ {
return revision; return revision;
} }
...@@ -201,6 +205,11 @@ public abstract class AbstractIU ...@@ -201,6 +205,11 @@ public abstract class AbstractIU
return committed; return committed;
} }
public boolean isRetracted()
{
return retracted;
}
public void setBuffer(Buffer buffer) public void setBuffer(Buffer buffer)
{ {
this.buffer = buffer; this.buffer = buffer;
...@@ -218,6 +227,8 @@ public abstract class AbstractIU ...@@ -218,6 +227,8 @@ public abstract class AbstractIU
public abstract void commit(); public abstract void commit();
public abstract void retract();
// XXX: might not be valid for all types of IUs // XXX: might not be valid for all types of IUs
public abstract void commit(String writerName); public abstract void commit(String writerName);
......
...@@ -38,6 +38,8 @@ import java.util.List; ...@@ -38,6 +38,8 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import ipaaca.Initializer;
/** /**
* Base class for InputBuffer and OutputBuffer. * Base class for InputBuffer and OutputBuffer.
*/ */
...@@ -85,6 +87,7 @@ public abstract class Buffer ...@@ -85,6 +87,7 @@ public abstract class Buffer
*/ */
public Buffer(String owningComponentName) public Buffer(String owningComponentName)
{ {
Initializer.initializeIpaacaRsb();
this.owningComponentName = owningComponentName; this.owningComponentName = owningComponentName;
uniqueName = "undef-" + uuid; uniqueName = "undef-" + uuid;
} }
...@@ -107,6 +110,11 @@ public abstract class Buffer ...@@ -107,6 +110,11 @@ public abstract class Buffer
{ {
eventHandlers.add(handler); eventHandlers.add(handler);
} }
public void removeHandler(IUEventHandler handler)
{
eventHandlers.remove(handler);
}
public void registerHandler(HandlerFunctor func) { public void registerHandler(HandlerFunctor func) {
IUEventHandler handler; IUEventHandler handler;
......
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
package ipaaca; package ipaaca;
import java.util.HashSet;
import java.util.Set; import java.util.Set;
...@@ -50,6 +51,7 @@ class BufferConfiguration { ...@@ -50,6 +51,7 @@ class BufferConfiguration {
this._owningComponentName = owningComponentName; this._owningComponentName = owningComponentName;
this._channel = "default"; this._channel = "default";
this._resendActive = false; this._resendActive = false;
this._category_interests = new HashSet<String>();
} }
public String getOwningComponentName() { public String getOwningComponentName() {
......
...@@ -33,7 +33,7 @@ ...@@ -33,7 +33,7 @@
package ipaaca; package ipaaca;
class BufferConfigurationBuilder extends BufferConfiguration { public class BufferConfigurationBuilder extends BufferConfiguration {
public BufferConfigurationBuilder(String owningComponentName) { public BufferConfigurationBuilder(String owningComponentName) {
super(owningComponentName); super(owningComponentName);
......
...@@ -88,12 +88,8 @@ public class IUConverter implements Converter<ByteBuffer> ...@@ -88,12 +88,8 @@ public class IUConverter implements Converter<ByteBuffer>
links.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build()); links.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build());
} }
IU.AccessMode accessMode = IU.AccessMode.PUSH; IU.AccessMode accessMode = iua.getAccessMode();
if(iua instanceof RemoteMessageIU || iua instanceof LocalMessageIU) IU iu = IU.newBuilder().setUid(iua.getUid()).setRevision((int) iua.getRevision()).setCategory(iua.getCategory())
{
accessMode = IU.AccessMode.MESSAGE;
}
IU iu = IU.newBuilder().setUid(iua.getUid()).setRevision(iua.getRevision()).setCategory(iua.getCategory())
.setOwnerName(iua.getOwnerName()).setCommitted(iua.isCommitted()).setAccessMode(accessMode) .setOwnerName(iua.getOwnerName()).setCommitted(iua.isCommitted()).setAccessMode(accessMode)
.setReadOnly(iua.isReadOnly()).setPayloadType("STR").addAllPayload(payloadItems).addAllLinks(links).build(); .setReadOnly(iua.isReadOnly()).setPayloadType("STR").addAllPayload(payloadItems).addAllLinks(links).build();
String wireFormat = (accessMode == IU.AccessMode.MESSAGE) ? "ipaaca-messageiu" : "ipaaca-iu"; String wireFormat = (accessMode == IU.AccessMode.MESSAGE) ? "ipaaca-messageiu" : "ipaaca-iu";
...@@ -108,6 +104,12 @@ public class IUConverter implements Converter<ByteBuffer> ...@@ -108,6 +104,12 @@ public class IUConverter implements Converter<ByteBuffer>
try try
{ {
iu = IU.newBuilder().mergeFrom(buffer.array()).build(); iu = IU.newBuilder().mergeFrom(buffer.array()).build();
// If there are rsb buffer read-only issues in some build, use this code instead of the above line:
//int size = buffer.capacity();
//byte[] array = new byte[size];
//buffer.get(array, 0, size);
//iu = IU.newBuilder().mergeFrom(array).build();
} }
catch (InvalidProtocolBufferException e) catch (InvalidProtocolBufferException e)
{ {
......
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca;
/**
* Error indicating that an IU is immutable because it has been retracted.
*
*/
public class IURetractedException extends RuntimeException
{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU()
{
return iu;
}
public IURetractedException(AbstractIU iu)
{
super("Writing to IU " + iu.getUid() + " failed -- it has been retracted.");
this.iu = iu;
}
}
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2013 Sociable Agents Group * Copyright (c) 2009-2015 Social Cognitive Systems Group
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
* http://opensource.cit-ec.de/projects/ipaaca/ * http://opensource.cit-ec.de/projects/ipaaca/
...@@ -32,9 +32,12 @@ ...@@ -32,9 +32,12 @@
package ipaaca; package ipaaca;
import java.nio.ByteBuffer;
import ipaaca.protobuf.Ipaaca.IUCommission; import ipaaca.protobuf.Ipaaca.IUCommission;
import ipaaca.protobuf.Ipaaca.IUResendRequest; import ipaaca.protobuf.Ipaaca.IUResendRequest;
import ipaaca.protobuf.Ipaaca.IURetraction;
import rsb.converter.ConverterSignature; import rsb.converter.ConverterSignature;
import rsb.converter.ConverterRepository;
import rsb.converter.DefaultConverterRepository; import rsb.converter.DefaultConverterRepository;
import rsb.converter.ProtocolBufferConverter; import rsb.converter.ProtocolBufferConverter;
...@@ -43,34 +46,74 @@ import rsb.converter.ProtocolBufferConverter; ...@@ -43,34 +46,74 @@ import rsb.converter.ProtocolBufferConverter;
* @author hvanwelbergen * @author hvanwelbergen
* *
*/ */
public final class Initializer public final class Initializer {
{
private Initializer() private Initializer() {}
{
}
private static volatile boolean initialized = false; private static volatile boolean initialized = false;
public synchronized static void initializeIpaacaRsb() public synchronized static void initializeIpaacaRsb() {
{ if (initialized)
if(initialized)return; return;
DefaultConverterRepository.getDefaultConverterRepository().addConverter(new IntConverter());
DefaultConverterRepository.getDefaultConverterRepository().addConverter( ConverterRepository<ByteBuffer> dcr =
new ProtocolBufferConverter<IUCommission>(IUCommission.getDefaultInstance())); DefaultConverterRepository.getDefaultConverterRepository();
// dlw
DefaultConverterRepository.getDefaultConverterRepository().addConverter( // for IU revision numbers
new ProtocolBufferConverter<IUResendRequest>(IUResendRequest.getDefaultInstance())); dcr.addConverter(
new IntConverter());
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new IUConverter(new ConverterSignature("ipaaca-iu", RemotePushIU.class))); // IU commit messages
DefaultConverterRepository.getDefaultConverterRepository().addConverter( dcr.addConverter(
new IUConverter(new ConverterSignature("ipaaca-localiu", LocalIU.class))); new ProtocolBufferConverter<IUCommission>(
DefaultConverterRepository.getDefaultConverterRepository().addConverter( IUCommission.getDefaultInstance()));
new IUConverter(new ConverterSignature("ipaaca-messageiu", RemoteMessageIU.class)));
DefaultConverterRepository.getDefaultConverterRepository().addConverter( // IU commit messages
new IUConverter(new ConverterSignature("ipaaca-localmessageiu", LocalMessageIU.class))); dcr.addConverter(
new ProtocolBufferConverter<IURetraction>(
IURetraction.getDefaultInstance()));
// IU resend request messages
dcr.addConverter(
new ProtocolBufferConverter<IUResendRequest>(
IUResendRequest.getDefaultInstance()));
// IUs
dcr.addConverter(
new IUConverter(
new ConverterSignature(
"ipaaca-iu",
RemotePushIU.class)));
// Local IUs
dcr.addConverter(
new IUConverter(
new ConverterSignature(
"ipaaca-localiu",
LocalIU.class)));
// Messages
dcr.addConverter(
new IUConverter(
new ConverterSignature(
"ipaaca-messageiu",
RemoteMessageIU.class)));
// LocalMessages
dcr.addConverter(
new IUConverter(
new ConverterSignature(
"ipaaca-localmessageiu",
LocalMessageIU.class)));
DefaultConverterRepository.getDefaultConverterRepository().addConverter(new PayloadConverter()); // Payloads
DefaultConverterRepository.getDefaultConverterRepository().addConverter(new LinkUpdateConverter()); dcr.addConverter(
new PayloadConverter());
// LinkUpdates
dcr.addConverter(
new LinkUpdateConverter());
initialized = true; initialized = true;
} }
} }
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
package ipaaca; package ipaaca;
import ipaaca.protobuf.Ipaaca.IUCommission; import ipaaca.protobuf.Ipaaca.IUCommission;
import ipaaca.protobuf.Ipaaca.IURetraction;
import ipaaca.protobuf.Ipaaca.IUResendRequest; import ipaaca.protobuf.Ipaaca.IUResendRequest;
import ipaaca.protobuf.Ipaaca.IULinkUpdate; import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate; import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
...@@ -441,6 +442,22 @@ public class InputBuffer extends Buffer ...@@ -441,6 +442,22 @@ public class InputBuffer extends Buffer
iu.setRevision(iuc.getRevision()); iu.setRevision(iuc.getRevision());
callIuEventHandlers(iuc.getUid(), false, IUEventType.COMMITTED, iu.getCategory()); callIuEventHandlers(iuc.getUid(), false, IUEventType.COMMITTED, iu.getCategory());
} }
if (event.getData() instanceof IURetraction)
{
IURetraction iuc = (IURetraction) event.getData();
logger.debug("handleIUEvents invoked with an IURetraction: {}", iuc);
logger.debug("{}", this.getUniqueName());
if (!iuStore.containsKey(iuc.getUid()))
{
logger.warn("Update message for IU which we did not fully receive before.");
}
RemotePushIU iu = this.iuStore.get(iuc.getUid());
if (iu != null) {
iu.applyRetraction();
callIuEventHandlers(iuc.getUid(), false, IUEventType.RETRACTED, iu.getCategory());
}
}
} }
} }
...@@ -466,10 +483,10 @@ public class InputBuffer extends Buffer ...@@ -466,10 +483,10 @@ public class InputBuffer extends Buffer
rServer = getRemoteServer(writerName); rServer = getRemoteServer(writerName);
if ((rServer != null)&&(uid != null)) { if ((rServer != null)&&(uid != null)) {
IUResendRequest iurr = IUResendRequest.newBuilder().setUid(uid).setHiddenScopeName(hiddenScopeName).build(); IUResendRequest iurr = IUResendRequest.newBuilder().setUid(uid).setHiddenScopeName(hiddenScopeName).build();
int rRevision = 0; long rRevision = 0;
try try
{ {
rRevision = (Integer) rServer.call("resendRequest", iurr); rRevision = (Long) rServer.call("resendRequest", iurr);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -483,6 +500,10 @@ public class InputBuffer extends Buffer ...@@ -483,6 +500,10 @@ public class InputBuffer extends Buffer
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (rRevision == 0) if (rRevision == 0)
{ {
//throw new IUResendFailedException(aiu); // TODO //throw new IUResendFailedException(aiu); // TODO
...@@ -508,6 +529,14 @@ public class InputBuffer extends Buffer ...@@ -508,6 +529,14 @@ public class InputBuffer extends Buffer
} }
} }
public void addCategoryInterest(String... categories)
{
for(String cat:categories)
{
createCategoryListenerIfNeeded(cat);
}
}
public Collection<RemotePushIU> getIUs() public Collection<RemotePushIU> getIUs()
{ {
return iuStore.values(); return iuStore.values();
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
package ipaaca; package ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.IULinkUpdate; import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate; import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate.Builder; import ipaaca.protobuf.Ipaaca.IUPayloadUpdate.Builder;
...@@ -50,6 +51,11 @@ import com.google.common.collect.SetMultimap; ...@@ -50,6 +51,11 @@ import com.google.common.collect.SetMultimap;
public class LocalIU extends AbstractIU public class LocalIU extends AbstractIU
{ {
public IU.AccessMode getAccessMode()
{
return IU.AccessMode.PUSH;
}
private OutputBuffer outputBuffer; private OutputBuffer outputBuffer;
...@@ -105,6 +111,10 @@ public class LocalIU extends AbstractIU ...@@ -105,6 +111,10 @@ public class LocalIU extends AbstractIU
synchronized (revisionLock) synchronized (revisionLock)
{ {
if (isRetracted())
{
throw new IURetractedException(this);
}
if (committed) if (committed)
{ {
throw new IUCommittedException(this); throw new IUCommittedException(this);
...@@ -121,6 +131,22 @@ public class LocalIU extends AbstractIU ...@@ -121,6 +131,22 @@ public class LocalIU extends AbstractIU
} }
} }
private void internalRetract()
{
synchronized (revisionLock)
{
if (isRetracted())
return;
increaseRevisionNumber();
retracted = true;
if (outputBuffer != null)
{
outputBuffer.sendIURetraction(this);
}
}
}
private void increaseRevisionNumber() private void increaseRevisionNumber()
{ {
revision++; revision++;
...@@ -148,6 +174,10 @@ public class LocalIU extends AbstractIU ...@@ -148,6 +174,10 @@ public class LocalIU extends AbstractIU
@Override @Override
void modifyLinks(boolean isDelta, SetMultimap<String, String> linksToAdd, SetMultimap<String, String> linksToRemove, String writerName) void modifyLinks(boolean isDelta, SetMultimap<String, String> linksToAdd, SetMultimap<String, String> linksToRemove, String writerName)
{ {
if (isRetracted())
{
throw new IURetractedException(this);
}
if (isCommitted()) if (isCommitted())
{ {
throw new IUCommittedException(this); throw new IUCommittedException(this);
...@@ -181,7 +211,7 @@ public class LocalIU extends AbstractIU ...@@ -181,7 +211,7 @@ public class LocalIU extends AbstractIU
removeSet.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build()); removeSet.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build());
} }
outputBuffer.sendIULinkUpdate(this, outputBuffer.sendIULinkUpdate(this,
IULinkUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setWriterName(wName).setIsDelta(isDelta) IULinkUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setWriterName(wName).setIsDelta(isDelta)
.addAllNewLinks(addSet).addAllLinksToRemove(removeSet).build()); .addAllNewLinks(addSet).addAllLinksToRemove(removeSet).build());
} }
} }
...@@ -226,13 +256,17 @@ public class LocalIU extends AbstractIU ...@@ -226,13 +256,17 @@ public class LocalIU extends AbstractIU
{ {
throw new IUCommittedException(this); throw new IUCommittedException(this);
} }
if (isRetracted())
{
throw new IURetractedException(this);
}
increaseRevisionNumber(); increaseRevisionNumber();
if (isPublished()) if (isPublished())
{ {
// send update to remote holders // send update to remote holders
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR") PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR")
.build(); .build();
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true) IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(true)
.setWriterName(writer == null ? getOwnerName() : writer).addNewItems(newItem).build(); .setWriterName(writer == null ? getOwnerName() : writer).addNewItems(newItem).build();
getOutputBuffer().sendIUPayloadUpdate(this, update); getOutputBuffer().sendIUPayloadUpdate(this, update);
} }
...@@ -249,10 +283,14 @@ public class LocalIU extends AbstractIU ...@@ -249,10 +283,14 @@ public class LocalIU extends AbstractIU
{ {
throw new IUCommittedException(this); throw new IUCommittedException(this);
} }
if (isRetracted())
{
throw new IURetractedException(this);
}
increaseRevisionNumber(); increaseRevisionNumber();
if (isPublished()) if (isPublished())
{ {
Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true) Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(true)
.setWriterName(writer == null ? getOwnerName() : writer); .setWriterName(writer == null ? getOwnerName() : writer);
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet()) for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{ {
...@@ -270,15 +308,29 @@ public class LocalIU extends AbstractIU ...@@ -270,15 +308,29 @@ public class LocalIU extends AbstractIU
@Override @Override
public void commit() public void commit()
{ {
if (isRetracted())
{
throw new IURetractedException(this);
}
internalCommit(null); internalCommit(null);
} }
@Override @Override
public void commit(String writerName) public void commit(String writerName)
{ {
if (isRetracted())
{
throw new IURetractedException(this);
}
internalCommit(writerName); internalCommit(writerName);
} }
@Override
public void retract()
{
internalRetract();
}
@Override @Override
void removeFromPayload(Object key, String writer) void removeFromPayload(Object key, String writer)
{ {
...@@ -288,11 +340,15 @@ public class LocalIU extends AbstractIU ...@@ -288,11 +340,15 @@ public class LocalIU extends AbstractIU
{ {
throw new IUCommittedException(this); throw new IUCommittedException(this);
} }
if (isRetracted())
{
throw new IURetractedException(this);
}
increaseRevisionNumber(); increaseRevisionNumber();
if (isPublished()) if (isPublished())
{ {
// send update to remote holders // send update to remote holders
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true) IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(true)
.setWriterName(writer == null ? getOwnerName() : writer).addKeysToRemove((String) key).build(); .setWriterName(writer == null ? getOwnerName() : writer).addKeysToRemove((String) key).build();
getOutputBuffer().sendIUPayloadUpdate(this, update); getOutputBuffer().sendIUPayloadUpdate(this, update);
} }
...@@ -305,7 +361,7 @@ public class LocalIU extends AbstractIU ...@@ -305,7 +361,7 @@ public class LocalIU extends AbstractIU
{ {
if (isPublished()) if (isPublished())
{ {
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(false) IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(false)
.setWriterName(writerName == null ? getOwnerName() : writerName).addAllNewItems(newPayload).build(); .setWriterName(writerName == null ? getOwnerName() : writerName).addAllNewItems(newPayload).build();
getOutputBuffer().sendIUPayloadUpdate(this, update); getOutputBuffer().sendIUPayloadUpdate(this, update);
} }
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2013 Sociable Agents Group * Copyright (c) 2009-2015 Social Cognitive Systems Group
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
* http://opensource.cit-ec.de/projects/ipaaca/ * http://opensource.cit-ec.de/projects/ipaaca/
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
package ipaaca; package ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
/** /**
* Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls. * Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls.
...@@ -50,4 +51,9 @@ public class LocalMessageIU extends LocalIU ...@@ -50,4 +51,9 @@ public class LocalMessageIU extends LocalIU
super(category); super(category);
} }
public IU.AccessMode getAccessMode()
{
return IU.AccessMode.MESSAGE;
}
} }
/* /*
* This file is part of IPAACA, the * This file is part of IPAACA, the
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2013 Sociable Agents Group * Copyright (c) 2009-2015 Social Cognitive Systems Group
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
* http://opensource.cit-ec.de/projects/ipaaca/ * http://opensource.cit-ec.de/projects/ipaaca/
...@@ -34,6 +35,7 @@ package ipaaca; ...@@ -34,6 +35,7 @@ package ipaaca;
import ipaaca.protobuf.Ipaaca; import ipaaca.protobuf.Ipaaca;
import ipaaca.protobuf.Ipaaca.IUCommission; import ipaaca.protobuf.Ipaaca.IUCommission;
import ipaaca.protobuf.Ipaaca.IURetraction;
import ipaaca.protobuf.Ipaaca.IUResendRequest; import ipaaca.protobuf.Ipaaca.IUResendRequest;
import ipaaca.protobuf.Ipaaca.IULinkUpdate; import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate; import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
...@@ -53,7 +55,9 @@ import rsb.Informer; ...@@ -53,7 +55,9 @@ import rsb.Informer;
import rsb.InitializeException; import rsb.InitializeException;
import rsb.RSBException; import rsb.RSBException;
import rsb.patterns.DataCallback; import rsb.patterns.DataCallback;
import rsb.patterns.EventCallback;
import rsb.patterns.LocalServer; import rsb.patterns.LocalServer;
import rsb.Event;
import com.google.common.collect.HashMultimap; import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap; import com.google.common.collect.SetMultimap;
...@@ -129,48 +133,58 @@ public class OutputBuffer extends Buffer ...@@ -129,48 +133,58 @@ public class OutputBuffer extends Buffer
this.channel = ipaaca_channel; this.channel = ipaaca_channel;
} }
private final class RemoteUpdatePayload extends DataCallback<Integer, IUPayloadUpdate> private final class RemoteUpdatePayload extends EventCallback //DataCallback<Long, IUPayloadUpdate>
{ {
@Override @Override
public Integer invoke(IUPayloadUpdate data) throws Throwable public Event invoke(final Event request) //throws Throwable
{ {
logger.debug("remoteUpdate"); logger.debug("remoteUpdate");
return remoteUpdatePayload(data); long result = remoteUpdatePayload((IUPayloadUpdate) request.getData());
//System.out.println("remoteUpdatePayload yielded revision "+result);
return new Event(Long.class, new Long(result));
} }
} }
/*private final class RemoteUpdatePayload extends DataCallback<Long, IUPayloadUpdate>
{
@Override
public Long invoke(IUPayloadUpdate data) throws Throwable
{
logger.debug("remoteUpdate");
return remoteUpdatePayload(data);
}
}*/
private final class RemoteUpdateLinks extends DataCallback<Integer, IULinkUpdate> private final class RemoteUpdateLinks extends EventCallback // DataCallback<Long, IULinkUpdate>
{ {
@Override @Override
public Integer invoke(IULinkUpdate data) throws Throwable public Event invoke(final Event request) //throws Throwable
{ {
logger.debug("remoteUpdateLinks"); logger.debug("remoteUpdateLinks");
return remoteUpdateLinks(data); return new Event(Long.class, new Long(remoteUpdateLinks((IULinkUpdate) request.getData())));
} }
} }
private final class RemoteCommit extends DataCallback<Integer, IUCommission> private final class RemoteCommit extends EventCallback //DataCallback<Long, IUCommission>
{ {
@Override @Override
public Integer invoke(IUCommission data) throws Throwable public Event invoke(final Event request) //throws Throwable
{ {
logger.debug("remoteCommit"); logger.debug("remoteCommit");
return remoteCommit(data); return new Event(Long.class, new Long(remoteCommit((IUCommission) request.getData())));
} }
} }
private final class RemoteResendRequest extends DataCallback<Integer, IUResendRequest> private final class RemoteResendRequest extends EventCallback //DataCallback<Long, IUResendRequest>
{ {
@Override @Override
public Integer invoke(IUResendRequest data) throws Throwable public Event invoke(final Event request) //throws Throwable
{ {
logger.debug("remoteResendRequest"); logger.debug("remoteResendRequest");
return remoteResendRequest(data); return new Event(Long.class, new Long(remoteResendRequest((IUResendRequest) request.getData())));
} }
} }
// def _remote_update_payload(self, update): // def _remote_update_payload(self, update):
...@@ -197,7 +211,7 @@ public class OutputBuffer extends Buffer ...@@ -197,7 +211,7 @@ public class OutputBuffer extends Buffer
* Apply a remotely requested update to one of the stored IUs. * Apply a remotely requested update to one of the stored IUs.
* @return 0 if not updated, IU version number otherwise * @return 0 if not updated, IU version number otherwise
*/ */
int remoteUpdatePayload(IUPayloadUpdate update) long remoteUpdatePayload(IUPayloadUpdate update)
{ {
if (!iuStore.containsKey(update.getUid())) if (!iuStore.containsKey(update.getUid()))
{ {
...@@ -242,7 +256,7 @@ public class OutputBuffer extends Buffer ...@@ -242,7 +256,7 @@ public class OutputBuffer extends Buffer
* Apply a remotely requested update to one of the stored IUs. * Apply a remotely requested update to one of the stored IUs.
* @return 0 if not updated, IU version number otherwise * @return 0 if not updated, IU version number otherwise
*/ */
int remoteUpdateLinks(IULinkUpdate update) long remoteUpdateLinks(IULinkUpdate update)
{ {
if (!iuStore.containsKey(update.getUid())) if (!iuStore.containsKey(update.getUid()))
{ {
...@@ -305,7 +319,7 @@ public class OutputBuffer extends Buffer ...@@ -305,7 +319,7 @@ public class OutputBuffer extends Buffer
/** /**
* Apply a remotely requested commit to one of the stored IUs. * Apply a remotely requested commit to one of the stored IUs.
*/ */
private int remoteCommit(IUCommission iuc) private long remoteCommit(IUCommission iuc)
{ {
if (!iuStore.containsKey(iuc.getUid())) if (!iuStore.containsKey(iuc.getUid()))
{ {
...@@ -334,7 +348,7 @@ public class OutputBuffer extends Buffer ...@@ -334,7 +348,7 @@ public class OutputBuffer extends Buffer
/* /*
* Resend an requested iu over the specific hidden channel. (dlw) TODO * Resend an requested iu over the specific hidden channel. (dlw) TODO
*/ */
private int remoteResendRequest(IUResendRequest iu_resend_request_pack) private long remoteResendRequest(IUResendRequest iu_resend_request_pack)
{ {
if (!iuStore.containsKey(iu_resend_request_pack.getUid())) if (!iuStore.containsKey(iu_resend_request_pack.getUid()))
{ {
...@@ -347,7 +361,7 @@ public class OutputBuffer extends Buffer ...@@ -347,7 +361,7 @@ public class OutputBuffer extends Buffer
Informer<Object> informer = getInformer(iu_resend_request_pack.getHiddenScopeName()); Informer<Object> informer = getInformer(iu_resend_request_pack.getHiddenScopeName());
try try
{ {
informer.send(iu); informer.publish(iu);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -402,6 +416,10 @@ public class OutputBuffer extends Buffer ...@@ -402,6 +416,10 @@ public class OutputBuffer extends Buffer
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (RSBException e)
{
throw new RuntimeException(e);
}
return informer; return informer;
} }
...@@ -439,7 +457,7 @@ public class OutputBuffer extends Buffer ...@@ -439,7 +457,7 @@ public class OutputBuffer extends Buffer
Informer<Object> informer = getInformer(iu.getCategory()); Informer<Object> informer = getInformer(iu.getCategory());
try try
{ {
informer.send(iu); informer.publish(iu);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -473,12 +491,26 @@ public class OutputBuffer extends Buffer ...@@ -473,12 +491,26 @@ public class OutputBuffer extends Buffer
*/ */
protected void sendIUCommission(AbstractIU iu, String writerName) protected void sendIUCommission(AbstractIU iu, String writerName)
{ {
IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(iu.getUid()).setRevision(iu.getRevision()) IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(iu.getUid()).setRevision((int) iu.getRevision())
.setWriterName(iu.getOwnerName() != null ? iu.getOwnerName() : writerName).build(); .setWriterName(writerName == null ? iu.getOwnerName() : writerName).build();
Informer<Object> informer = getInformer(iu.getCategory());
try
{
informer.publish(iuc);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
}
protected void sendIURetraction(AbstractIU iu)
{
IURetraction iuc = Ipaaca.IURetraction.newBuilder().setUid(iu.getUid()).setRevision((int) iu.getRevision()).build();
Informer<Object> informer = getInformer(iu.getCategory()); Informer<Object> informer = getInformer(iu.getCategory());
try try
{ {
informer.send(iuc); informer.publish(iuc);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -518,7 +550,7 @@ public class OutputBuffer extends Buffer ...@@ -518,7 +550,7 @@ public class OutputBuffer extends Buffer
Informer<Object> informer = getInformer(iu.getCategory()); Informer<Object> informer = getInformer(iu.getCategory());
try try
{ {
informer.send(update); informer.publish(update);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -531,7 +563,7 @@ public class OutputBuffer extends Buffer ...@@ -531,7 +563,7 @@ public class OutputBuffer extends Buffer
Informer<Object> informer = getInformer(iu.getCategory()); Informer<Object> informer = getInformer(iu.getCategory());
try try
{ {
informer.send(update); informer.publish(update);
} }
catch (RSBException e) catch (RSBException e)
{ {
......
...@@ -36,7 +36,10 @@ import ipaaca.protobuf.Ipaaca.PayloadItem; ...@@ -36,7 +36,10 @@ import ipaaca.protobuf.Ipaaca.PayloadItem;
import org.apache.commons.lang.StringEscapeUtils; import org.apache.commons.lang.StringEscapeUtils;
import com.google.common.collect.ImmutableSet;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -49,7 +52,7 @@ import java.util.Set; ...@@ -49,7 +52,7 @@ import java.util.Set;
*/ */
public class Payload implements Map<String, String> public class Payload implements Map<String, String>
{ {
private Map<String, String> map = new HashMap<String, String>(); private Map<String, String> map = Collections.synchronizedMap(new HashMap<String, String>());
private final AbstractIU iu; private final AbstractIU iu;
public Payload(AbstractIU iu) public Payload(AbstractIU iu)
...@@ -82,17 +85,23 @@ public class Payload implements Map<String, String> ...@@ -82,17 +85,23 @@ public class Payload implements Map<String, String>
public void set(Map<String, String> newPayload, String writerName) public void set(Map<String, String> newPayload, String writerName)
{ {
iu.setPayload(newPayload, writerName); iu.setPayload(newPayload, writerName);
map.clear(); synchronized(map)
map.putAll(newPayload); {
map.clear();
map.putAll(newPayload);
}
} }
public void set(List<PayloadItem> newPayload, String writerName) public void set(List<PayloadItem> newPayload, String writerName)
{ {
iu.handlePayloadSetting(newPayload, writerName); iu.handlePayloadSetting(newPayload, writerName);
map.clear(); synchronized(map)
for (PayloadItem item : newPayload)
{ {
map.put(item.getKey(), pseudoConvertFromJSON(item.getValue(), item.getType())); map.clear();
for (PayloadItem item : newPayload)
{
map.put(item.getKey(), pseudoConvertFromJSON(item.getValue(), item.getType()));
}
} }
} }
...@@ -136,9 +145,12 @@ public class Payload implements Map<String, String> ...@@ -136,9 +145,12 @@ public class Payload implements Map<String, String>
return map.containsValue(value); return map.containsValue(value);
} }
public Set<java.util.Map.Entry<String, String>> entrySet() /**
* Provides an immutable copy of the entryset of the Payload
*/
public ImmutableSet<java.util.Map.Entry<String, String>> entrySet()
{ {
return map.entrySet(); return ImmutableSet.copyOf(map.entrySet());
} }
public boolean equals(Object o) public boolean equals(Object o)
...@@ -198,7 +210,7 @@ public class Payload implements Map<String, String> ...@@ -198,7 +210,7 @@ public class Payload implements Map<String, String>
public void putAll(Map<? extends String, ? extends String> newItems) public void putAll(Map<? extends String, ? extends String> newItems)
{ {
putAll(newItems); putAll(newItems, null);
} }
public void putAll(Map<? extends String, ? extends String> newItems, String writer) public void putAll(Map<? extends String, ? extends String> newItems, String writer)
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2013 Sociable Agents Group * Copyright (c) 2009-2015 Social Cognitive Systems Group
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
* http://opensource.cit-ec.de/projects/ipaaca/ * http://opensource.cit-ec.de/projects/ipaaca/
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
package ipaaca; package ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.PayloadItem; import ipaaca.protobuf.Ipaaca.PayloadItem;
import java.util.List; import java.util.List;
...@@ -45,6 +46,11 @@ import com.google.common.collect.SetMultimap; ...@@ -45,6 +46,11 @@ import com.google.common.collect.SetMultimap;
public class RemoteMessageIU extends AbstractIU public class RemoteMessageIU extends AbstractIU
{ {
public IU.AccessMode getAccessMode()
{
return IU.AccessMode.MESSAGE;
}
public RemoteMessageIU(String uid) public RemoteMessageIU(String uid)
{ {
super(uid); super(uid);
...@@ -58,6 +64,12 @@ public class RemoteMessageIU extends AbstractIU ...@@ -58,6 +64,12 @@ public class RemoteMessageIU extends AbstractIU
committed = true; committed = true;
} }
@Override
public void retract()
{
log.info("Retracting a RemoteMessage has no effect.");
}
@Override @Override
public void commit(String writerName) public void commit(String writerName)
{ {
......
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
package ipaaca; package ipaaca;
import ipaaca.protobuf.Ipaaca; import ipaaca.protobuf.Ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.IUCommission; import ipaaca.protobuf.Ipaaca.IUCommission;
import ipaaca.protobuf.Ipaaca.IULinkUpdate; import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate; import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
...@@ -66,6 +67,11 @@ public class RemotePushIU extends AbstractIU ...@@ -66,6 +67,11 @@ public class RemotePushIU extends AbstractIU
private final static Logger logger = LoggerFactory.getLogger(RemotePushIU.class.getName()); private final static Logger logger = LoggerFactory.getLogger(RemotePushIU.class.getName());
private InputBuffer inputBuffer; private InputBuffer inputBuffer;
public IU.AccessMode getAccessMode()
{
return IU.AccessMode.PUSH;
}
public InputBuffer getInputBuffer() public InputBuffer getInputBuffer()
{ {
return inputBuffer; return inputBuffer;
...@@ -84,9 +90,9 @@ public class RemotePushIU extends AbstractIU ...@@ -84,9 +90,9 @@ public class RemotePushIU extends AbstractIU
} }
@Override @Override
public void commit() public void retract()
{ {
commit(null); logger.info("Retracting a RemoteIU has no effect.");
} }
void putIntoPayload(String key, String value, String writer) void putIntoPayload(String key, String value, String writer)
...@@ -95,20 +101,26 @@ public class RemotePushIU extends AbstractIU ...@@ -95,20 +101,26 @@ public class RemotePushIU extends AbstractIU
{ {
throw new IUCommittedException(this); throw new IUCommittedException(this);
} }
if (isRetracted())
{
throw new IURetractedException(this);
}
if (isReadOnly()) if (isReadOnly())
{ {
throw new IUReadOnlyException(this); throw new IUReadOnlyException(this);
} }
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR").build(); PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR").build();
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision(getRevision()) IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision((int) getRevision())
.setWriterName(getBuffer().getUniqueName()).addNewItems(newItem).build(); .setWriterName(getBuffer().getUniqueName()).addNewItems(newItem).build();
RemoteServer server = getInputBuffer().getRemoteServer(this); RemoteServer server = getInputBuffer().getRemoteServer(this);
logger.debug("Remote server has methods {}", server.getMethods()); logger.debug("Remote server has methods {}", server.getMethods());
int newRevision; long newRevision;
try try
{ {
newRevision = (Integer) server.call("updatePayload", update); //System.out.println("calling remote updatePayload ...");
newRevision = (Long) server.call("updatePayload", update);
//System.out.println(" ... done");
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -122,6 +134,10 @@ public class RemotePushIU extends AbstractIU ...@@ -122,6 +134,10 @@ public class RemotePushIU extends AbstractIU
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0) if (newRevision == 0)
{ {
throw new IUUpdateFailedException(this); throw new IUUpdateFailedException(this);
...@@ -136,11 +152,15 @@ public class RemotePushIU extends AbstractIU ...@@ -136,11 +152,15 @@ public class RemotePushIU extends AbstractIU
{ {
throw new IUCommittedException(this); throw new IUCommittedException(this);
} }
if (isRetracted())
{
throw new IURetractedException(this);
}
if (isReadOnly()) if (isReadOnly())
{ {
throw new IUReadOnlyException(this); throw new IUReadOnlyException(this);
} }
Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true) Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(true)
.setWriterName(getBuffer().getUniqueName()); .setWriterName(getBuffer().getUniqueName());
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet()) for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{ {
...@@ -153,10 +173,10 @@ public class RemotePushIU extends AbstractIU ...@@ -153,10 +173,10 @@ public class RemotePushIU extends AbstractIU
RemoteServer server = getInputBuffer().getRemoteServer(this); RemoteServer server = getInputBuffer().getRemoteServer(this);
logger.debug("Remote server has methods {}", server.getMethods()); logger.debug("Remote server has methods {}", server.getMethods());
int newRevision; long newRevision;
try try
{ {
newRevision = (Integer) server.call("updatePayload", update); newRevision = (Long) server.call("updatePayload", update);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -170,6 +190,10 @@ public class RemotePushIU extends AbstractIU ...@@ -170,6 +190,10 @@ public class RemotePushIU extends AbstractIU
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0) if (newRevision == 0)
{ {
throw new IUUpdateFailedException(this); throw new IUUpdateFailedException(this);
...@@ -179,9 +203,19 @@ public class RemotePushIU extends AbstractIU ...@@ -179,9 +203,19 @@ public class RemotePushIU extends AbstractIU
setRevision(newRevision); setRevision(newRevision);
} }
@Override
public void commit()
{
commit(null);
}
@Override @Override
public void commit(String writerName) public void commit(String writerName)
{ {
if (isRetracted())
{
throw new IURetractedException(this);
}
if (isReadOnly()) if (isReadOnly())
{ {
throw new IUReadOnlyException(this); throw new IUReadOnlyException(this);
...@@ -192,13 +226,13 @@ public class RemotePushIU extends AbstractIU ...@@ -192,13 +226,13 @@ public class RemotePushIU extends AbstractIU
} }
else else
{ {
IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(getUid()).setRevision(getRevision()) IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(getUid()).setRevision((int) getRevision())
.setWriterName(getBuffer().getUniqueName()).build(); .setWriterName(getBuffer().getUniqueName()).build();
RemoteServer server = inputBuffer.getRemoteServer(this); RemoteServer server = inputBuffer.getRemoteServer(this);
int newRevision; long newRevision;
try try
{ {
newRevision = (Integer) server.call("commit", iuc); newRevision = (Long) server.call("commit", iuc);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -212,6 +246,10 @@ public class RemotePushIU extends AbstractIU ...@@ -212,6 +246,10 @@ public class RemotePushIU extends AbstractIU
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0) if (newRevision == 0)
{ {
throw new IUCommittedException(this); throw new IUCommittedException(this);
...@@ -254,18 +292,22 @@ public class RemotePushIU extends AbstractIU ...@@ -254,18 +292,22 @@ public class RemotePushIU extends AbstractIU
{ {
throw new IUCommittedException(this); throw new IUCommittedException(this);
} }
if (isRetracted())
{
throw new IURetractedException(this);
}
if (isReadOnly()) if (isReadOnly())
{ {
throw new IUReadOnlyException(this); throw new IUReadOnlyException(this);
} }
IUPayloadUpdate iuu = IUPayloadUpdate.newBuilder().setRevision(getRevision()).setIsDelta(false).setUid(getUid()) IUPayloadUpdate iuu = IUPayloadUpdate.newBuilder().setRevision((int) getRevision()).setIsDelta(false).setUid(getUid())
.addAllNewItems(newItems).setWriterName(getBuffer() != null ? getBuffer().getUniqueName() : "").build(); .addAllNewItems(newItems).setWriterName(getBuffer() != null ? getBuffer().getUniqueName() : "").build();
RemoteServer server = inputBuffer.getRemoteServer(this); RemoteServer server = inputBuffer.getRemoteServer(this);
int newRevision; long newRevision;
try try
{ {
newRevision = (Integer) server.call("updatePayload", iuu); newRevision = (Long) server.call("updatePayload", iuu);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -279,6 +321,10 @@ public class RemotePushIU extends AbstractIU ...@@ -279,6 +321,10 @@ public class RemotePushIU extends AbstractIU
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0) if (newRevision == 0)
{ {
throw new IUUpdateFailedException(this); throw new IUUpdateFailedException(this);
...@@ -357,6 +403,11 @@ public class RemotePushIU extends AbstractIU ...@@ -357,6 +403,11 @@ public class RemotePushIU extends AbstractIU
committed = true; committed = true;
} }
public void applyRetraction()
{
retracted = true;
}
@Override @Override
void removeFromPayload(Object key, String writer) void removeFromPayload(Object key, String writer)
{ {
...@@ -364,17 +415,21 @@ public class RemotePushIU extends AbstractIU ...@@ -364,17 +415,21 @@ public class RemotePushIU extends AbstractIU
{ {
throw new IUCommittedException(this); throw new IUCommittedException(this);
} }
if (isRetracted())
{
throw new IURetractedException(this);
}
if (isReadOnly()) if (isReadOnly())
{ {
throw new IUReadOnlyException(this); throw new IUReadOnlyException(this);
} }
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision(getRevision()) IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision((int) getRevision())
.setWriterName(getBuffer().getUniqueName()).addKeysToRemove((String) key).build(); .setWriterName(getBuffer().getUniqueName()).addKeysToRemove((String) key).build();
RemoteServer server = getInputBuffer().getRemoteServer(this); RemoteServer server = getInputBuffer().getRemoteServer(this);
int newRevision; long newRevision;
try try
{ {
newRevision = (Integer) server.call("updatePayload", update); newRevision = (Long) server.call("updatePayload", update);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -388,6 +443,10 @@ public class RemotePushIU extends AbstractIU ...@@ -388,6 +443,10 @@ public class RemotePushIU extends AbstractIU
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0) if (newRevision == 0)
{ {
throw new IUUpdateFailedException(this); throw new IUUpdateFailedException(this);
...@@ -403,6 +462,10 @@ public class RemotePushIU extends AbstractIU ...@@ -403,6 +462,10 @@ public class RemotePushIU extends AbstractIU
{ {
throw new IUCommittedException(this); throw new IUCommittedException(this);
} }
if (isRetracted())
{
throw new IURetractedException(this);
}
if (isReadOnly()) if (isReadOnly())
{ {
throw new IUReadOnlyException(this); throw new IUReadOnlyException(this);
...@@ -420,11 +483,11 @@ public class RemotePushIU extends AbstractIU ...@@ -420,11 +483,11 @@ public class RemotePushIU extends AbstractIU
} }
IULinkUpdate update = IULinkUpdate.newBuilder().addAllLinksToRemove(removeLinkSet).addAllNewLinks(newLinkSet).setIsDelta(isDelta) IULinkUpdate update = IULinkUpdate.newBuilder().addAllLinksToRemove(removeLinkSet).addAllNewLinks(newLinkSet).setIsDelta(isDelta)
.setWriterName(getBuffer() != null ? getBuffer().getUniqueName() : "").setUid(getUid()).setRevision(getRevision()).build(); .setWriterName(getBuffer() != null ? getBuffer().getUniqueName() : "").setUid(getUid()).setRevision((int) getRevision()).build();
int newRevision; long newRevision;
try try
{ {
newRevision = (Integer) server.call("updateLinks", update); newRevision = (Long) server.call("updateLinks", update);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -438,6 +501,10 @@ public class RemotePushIU extends AbstractIU ...@@ -438,6 +501,10 @@ public class RemotePushIU extends AbstractIU
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0) if (newRevision == 0)
{ {
throw new IUUpdateFailedException(this); throw new IUUpdateFailedException(this);
......
package ipaaca.util;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
import ipaaca.LocalIU;
import ipaaca.OutputBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
/**
* A simple key-value blackboard
* @author hvanwelbergen
*/
public class Blackboard
{
private final OutputBuffer outBuffer;
private final InputBuffer inBuffer;
private final LocalIU iu;
private final ComponentNotifier notifier;
private static final String DUMMY_KEY = "DUMMY_KEY";
public static final String MESSAGE_SUFFIX = "MESSAGE";
private int dummyValue = 0;
private List<BlackboardUpdateListener> listeners = Collections.synchronizedList(new ArrayList<BlackboardUpdateListener>());
public Blackboard(String id, String category)
{
this(id, category, "default");
}
private void updateListeners()
{
synchronized (listeners)
{
for (BlackboardUpdateListener listener : listeners)
{
listener.update();
}
}
}
public Blackboard(String id, String category, String channel)
{
outBuffer = new OutputBuffer(id, channel);
iu = new LocalIU(category);
outBuffer.add(iu);
outBuffer.registerHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
updateListeners();
}
});
inBuffer = new InputBuffer(id, ImmutableSet.of(ComponentNotifier.NOTIFY_CATEGORY, category + MESSAGE_SUFFIX), channel);
notifier = new ComponentNotifier(id, category, ImmutableSet.of(category), new HashSet<String>(), outBuffer, inBuffer);
notifier.addNotificationHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iuNotify, IUEventType type, boolean local)
{
dummyValue++;
iu.getPayload().put(DUMMY_KEY, "" + dummyValue);
}
});
notifier.initialize();
inBuffer.registerHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iuMessage, IUEventType type, boolean local)
{
iu.getPayload().putAll(iuMessage.getPayload());
updateListeners();
}
}, ImmutableSet.of(category + MESSAGE_SUFFIX));
}
public String put(String key, String value)
{
return iu.getPayload().put(key, value);
}
public void putAll(Map<String, String> newItems)
{
iu.getPayload().putAll(newItems);
}
/**
* Get the value corresponding to the key, or null if it is not available
*/
public String get(String key)
{
return iu.getPayload().get(key);
}
public void addUpdateListener(BlackboardUpdateListener listener)
{
listeners.add(listener);
}
public Set<String> keySet()
{
return iu.getPayload().keySet();
}
public Set<Map.Entry<String, String>> entrySet()
{
return iu.getPayload().entrySet();
}
public Collection<String> values()
{
return iu.getPayload().values();
}
public void close()
{
outBuffer.close();
inBuffer.close();
}
}
package ipaaca.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
/**
* Client to get/set key value pairs on a Blackboard
* @author hvanwelbergen
*
*/
public class BlackboardClient
{
private final InputBuffer inBuffer;
private final OutputBuffer outBuffer;
private List<BlackboardUpdateListener> listeners = Collections.synchronizedList(new ArrayList<BlackboardUpdateListener>());
private final String category;
public BlackboardClient(String id, String category)
{
this(id, category, "default");
}
public BlackboardClient(String id, String category, String channel)
{
this.category = category;
inBuffer = new InputBuffer(id, ImmutableSet.of(category, ComponentNotifier.NOTIFY_CATEGORY), channel);
inBuffer.setResendActive(true);
inBuffer.registerHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
synchronized (listeners)
{
for (BlackboardUpdateListener listener : listeners)
{
listener.update();
}
}
}
}, ImmutableSet.of(category));
outBuffer = new OutputBuffer(id);
ComponentNotifier notifier = new ComponentNotifier(id, category, new HashSet<String>(), ImmutableSet.of(category),
outBuffer, inBuffer);
notifier.initialize();
}
public void close()
{
inBuffer.close();
outBuffer.close();
}
public void waitForBlackboardConnection()
{
while(inBuffer.getIUs().isEmpty());
}
public String get(String key)
{
if (inBuffer.getIUs().isEmpty())
{
return null;
}
return inBuffer.getIUs().iterator().next().getPayload().get(key);
}
public void put(String key, String value)
{
LocalMessageIU iu = new LocalMessageIU(category+Blackboard.MESSAGE_SUFFIX);
iu.getPayload().put(key,value);
outBuffer.add(iu);
}
public void putAll(Map<String,String> values)
{
LocalMessageIU iu = new LocalMessageIU(category+Blackboard.MESSAGE_SUFFIX);
iu.getPayload().putAll(values);
outBuffer.add(iu);
}
private boolean hasIU()
{
return !inBuffer.getIUs().isEmpty();
}
private AbstractIU getIU()
{
return inBuffer.getIUs().iterator().next();
}
public Set<String> keySet()
{
if(!hasIU())return new HashSet<>();
return getIU().getPayload().keySet();
}
public Set<Map.Entry<String, String>> entrySet()
{
if(!hasIU())return new HashSet<>();
return getIU().getPayload().entrySet();
}
public Collection<String> values()
{
if(!hasIU())return new HashSet<>();
return getIU().getPayload().values();
}
public void addUpdateListener(BlackboardUpdateListener listener)
{
listeners.add(listener);
}
}