diff --git a/ipaacalib/java/src/ipaaca/AbstractIU.java b/ipaacalib/java/src/ipaaca/AbstractIU.java index d79f1622871ec4f960781e6630604e9fe3054510..ba075e527c1054395ed6f965f94f0adac5e5dcc3 100644 --- a/ipaacalib/java/src/ipaaca/AbstractIU.java +++ b/ipaacalib/java/src/ipaaca/AbstractIU.java @@ -57,6 +57,7 @@ public abstract class AbstractIU protected Payload payload; protected String category; protected boolean committed = false; + protected boolean retracted = false; private String uid; protected int revision; private boolean readOnly = false; @@ -201,6 +202,11 @@ public abstract class AbstractIU return committed; } + public boolean isRetracted() + { + return retracted; + } + public void setBuffer(Buffer buffer) { this.buffer = buffer; @@ -218,6 +224,8 @@ public abstract class AbstractIU public abstract void commit(); + public abstract void retract(); + // XXX: might not be valid for all types of IUs public abstract void commit(String writerName); diff --git a/ipaacalib/java/src/ipaaca/IURetractedException.java b/ipaacalib/java/src/ipaaca/IURetractedException.java new file mode 100644 index 0000000000000000000000000000000000000000..b7d4f76c2e4809c731130c7d54103dde2caf5677 --- /dev/null +++ b/ipaacalib/java/src/ipaaca/IURetractedException.java @@ -0,0 +1,54 @@ +/* + * This file is part of IPAACA, the + * "Incremental Processing Architecture + * for Artificial Conversational Agents". + * + * Copyright (c) 2009-2015 Social Cognitive Systems Group + * CITEC, Bielefeld University + * + * http://opensource.cit-ec.de/projects/ipaaca/ + * http://purl.org/net/ipaaca + * + * This file may be licensed under the terms of of the + * GNU Lesser General Public License Version 3 (the ``LGPL''), + * or (at your option) any later version. + * + * Software distributed under the License is distributed + * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either + * express or implied. See the LGPL for the specific language + * governing rights and limitations. + * + * You should have received a copy of the LGPL along with this + * program. If not, go to http://www.gnu.org/licenses/lgpl.html + * or write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * The development of this software was supported by the + * Excellence Cluster EXC 277 Cognitive Interaction Technology. + * The Excellence Cluster EXC 277 is a grant of the Deutsche + * Forschungsgemeinschaft (DFG) in the context of the German + * Excellence Initiative. + */ + +package ipaaca; + +/** + * Error indicating that an IU is immutable because it has been retracted. + * + */ +public class IURetractedException extends RuntimeException +{ + private static final long serialVersionUID = 1L; + private final AbstractIU iu; + + public AbstractIU getIU() + { + return iu; + } + + public IURetractedException(AbstractIU iu) + { + super("Writing to IU " + iu.getUid() + " failed -- it has been retracted."); + this.iu = iu; + } +} diff --git a/ipaacalib/java/src/ipaaca/Initializer.java b/ipaacalib/java/src/ipaaca/Initializer.java index 0af784bd8e7051dd1889edb0465ef07add9de566..c2b8c72cb19e9966906008c2fd855691976c5abc 100644 --- a/ipaacalib/java/src/ipaaca/Initializer.java +++ b/ipaacalib/java/src/ipaaca/Initializer.java @@ -3,7 +3,7 @@ * "Incremental Processing Architecture * for Artificial Conversational Agents". * - * Copyright (c) 2009-2013 Sociable Agents Group + * Copyright (c) 2009-2015 Social Cognitive Systems Group * CITEC, Bielefeld University * * http://opensource.cit-ec.de/projects/ipaaca/ @@ -32,9 +32,12 @@ package ipaaca; +import java.nio.ByteBuffer; import ipaaca.protobuf.Ipaaca.IUCommission; import ipaaca.protobuf.Ipaaca.IUResendRequest; +import ipaaca.protobuf.Ipaaca.IURetraction; import rsb.converter.ConverterSignature; +import rsb.converter.ConverterRepository; import rsb.converter.DefaultConverterRepository; import rsb.converter.ProtocolBufferConverter; @@ -43,34 +46,74 @@ import rsb.converter.ProtocolBufferConverter; * @author hvanwelbergen * */ -public final class Initializer -{ - private Initializer() - { - } +public final class Initializer { + + private Initializer() {} + private static volatile boolean initialized = false; - public synchronized static void initializeIpaacaRsb() - { - if(initialized)return; - DefaultConverterRepository.getDefaultConverterRepository().addConverter(new IntConverter()); - DefaultConverterRepository.getDefaultConverterRepository().addConverter( - new ProtocolBufferConverter<IUCommission>(IUCommission.getDefaultInstance())); - // dlw - DefaultConverterRepository.getDefaultConverterRepository().addConverter( - new ProtocolBufferConverter<IUResendRequest>(IUResendRequest.getDefaultInstance())); - - DefaultConverterRepository.getDefaultConverterRepository().addConverter( - new IUConverter(new ConverterSignature("ipaaca-iu", RemotePushIU.class))); - DefaultConverterRepository.getDefaultConverterRepository().addConverter( - new IUConverter(new ConverterSignature("ipaaca-localiu", LocalIU.class))); - DefaultConverterRepository.getDefaultConverterRepository().addConverter( - new IUConverter(new ConverterSignature("ipaaca-messageiu", RemoteMessageIU.class))); - DefaultConverterRepository.getDefaultConverterRepository().addConverter( - new IUConverter(new ConverterSignature("ipaaca-localmessageiu", LocalMessageIU.class))); + public synchronized static void initializeIpaacaRsb() { + if (initialized) + return; + + ConverterRepository<ByteBuffer> dcr = + DefaultConverterRepository.getDefaultConverterRepository(); + + // for IU revision numbers + dcr.addConverter( + new IntConverter()); + + // IU commit messages + dcr.addConverter( + new ProtocolBufferConverter<IUCommission>( + IUCommission.getDefaultInstance())); + + // IU commit messages + dcr.addConverter( + new ProtocolBufferConverter<IURetraction>( + IURetraction.getDefaultInstance())); + + // IU resend request messages + dcr.addConverter( + new ProtocolBufferConverter<IUResendRequest>( + IUResendRequest.getDefaultInstance())); + + // IUs + dcr.addConverter( + new IUConverter( + new ConverterSignature( + "ipaaca-iu", + RemotePushIU.class))); + + // Local IUs + dcr.addConverter( + new IUConverter( + new ConverterSignature( + "ipaaca-localiu", + LocalIU.class))); + + // Messages + dcr.addConverter( + new IUConverter( + new ConverterSignature( + "ipaaca-messageiu", + RemoteMessageIU.class))); + + // LocalMessages + dcr.addConverter( + new IUConverter( + new ConverterSignature( + "ipaaca-localmessageiu", + LocalMessageIU.class))); - DefaultConverterRepository.getDefaultConverterRepository().addConverter(new PayloadConverter()); - DefaultConverterRepository.getDefaultConverterRepository().addConverter(new LinkUpdateConverter()); + // Payloads + dcr.addConverter( + new PayloadConverter()); + + // LinkUpdates + dcr.addConverter( + new LinkUpdateConverter()); + initialized = true; } } diff --git a/ipaacalib/java/src/ipaaca/InputBuffer.java b/ipaacalib/java/src/ipaaca/InputBuffer.java index ccaabc922935c84a6808678a10c0dad5c0608e05..723a60505140c739c9996bcd2658807b921e95d1 100644 --- a/ipaacalib/java/src/ipaaca/InputBuffer.java +++ b/ipaacalib/java/src/ipaaca/InputBuffer.java @@ -33,6 +33,7 @@ package ipaaca; import ipaaca.protobuf.Ipaaca.IUCommission; +import ipaaca.protobuf.Ipaaca.IURetraction; import ipaaca.protobuf.Ipaaca.IUResendRequest; import ipaaca.protobuf.Ipaaca.IULinkUpdate; import ipaaca.protobuf.Ipaaca.IUPayloadUpdate; @@ -441,6 +442,20 @@ public class InputBuffer extends Buffer iu.setRevision(iuc.getRevision()); callIuEventHandlers(iuc.getUid(), false, IUEventType.COMMITTED, iu.getCategory()); } + if (event.getData() instanceof IURetraction) + { + IURetraction iuc = (IURetraction) event.getData(); + logger.debug("handleIUEvents invoked with an IURetraction: {}", iuc); + logger.debug("{}", this.getUniqueName()); + + if (!iuStore.containsKey(iuc.getUid())) + { + logger.warn("Update message for IU which we did not fully receive before."); + } + RemotePushIU iu = this.iuStore.get(iuc.getUid()); + iu.applyRetraction(); + callIuEventHandlers(iuc.getUid(), false, IUEventType.RETRACTED, iu.getCategory()); + } } } diff --git a/ipaacalib/java/src/ipaaca/LocalIU.java b/ipaacalib/java/src/ipaaca/LocalIU.java index 6e87a82484f882a25dddda7e314dfd76ebdb3060..eb4ee552a4efc512992b16b9a10f5d957a422fd4 100644 --- a/ipaacalib/java/src/ipaaca/LocalIU.java +++ b/ipaacalib/java/src/ipaaca/LocalIU.java @@ -105,6 +105,10 @@ public class LocalIU extends AbstractIU synchronized (revisionLock) { + if (isRetracted()) + { + throw new IURetractedException(this); + } if (committed) { throw new IUCommittedException(this); @@ -121,6 +125,22 @@ public class LocalIU extends AbstractIU } } + private void internalRetract() + { + + synchronized (revisionLock) + { + if (isRetracted()) + return; + increaseRevisionNumber(); + retracted = true; + if (outputBuffer != null) + { + outputBuffer.sendIURetraction(this); + } + } + } + private void increaseRevisionNumber() { revision++; @@ -148,6 +168,10 @@ public class LocalIU extends AbstractIU @Override void modifyLinks(boolean isDelta, SetMultimap<String, String> linksToAdd, SetMultimap<String, String> linksToRemove, String writerName) { + if (isRetracted()) + { + throw new IURetractedException(this); + } if (isCommitted()) { throw new IUCommittedException(this); @@ -226,6 +250,10 @@ public class LocalIU extends AbstractIU { throw new IUCommittedException(this); } + if (isRetracted()) + { + throw new IURetractedException(this); + } increaseRevisionNumber(); if (isPublished()) { @@ -249,6 +277,10 @@ public class LocalIU extends AbstractIU { throw new IUCommittedException(this); } + if (isRetracted()) + { + throw new IURetractedException(this); + } increaseRevisionNumber(); if (isPublished()) { @@ -270,15 +302,29 @@ public class LocalIU extends AbstractIU @Override public void commit() { + if (isRetracted()) + { + throw new IURetractedException(this); + } internalCommit(null); } @Override public void commit(String writerName) { + if (isRetracted()) + { + throw new IURetractedException(this); + } internalCommit(writerName); } + @Override + public void retract() + { + internalRetract(); + } + @Override void removeFromPayload(Object key, String writer) { @@ -288,6 +334,10 @@ public class LocalIU extends AbstractIU { throw new IUCommittedException(this); } + if (isRetracted()) + { + throw new IURetractedException(this); + } increaseRevisionNumber(); if (isPublished()) { diff --git a/ipaacalib/java/src/ipaaca/LocalMessageIU.java b/ipaacalib/java/src/ipaaca/LocalMessageIU.java index bf9c12862d8b8027b8facbb0586f46894d5c2699..1bd5ea34687ebd45d92078aec87cab2429bf65c8 100644 --- a/ipaacalib/java/src/ipaaca/LocalMessageIU.java +++ b/ipaacalib/java/src/ipaaca/LocalMessageIU.java @@ -3,7 +3,7 @@ * "Incremental Processing Architecture * for Artificial Conversational Agents". * - * Copyright (c) 2009-2013 Sociable Agents Group + * Copyright (c) 2009-2015 Social Cognitive Systems Group * CITEC, Bielefeld University * * http://opensource.cit-ec.de/projects/ipaaca/ diff --git a/ipaacalib/java/src/ipaaca/OutputBuffer.java b/ipaacalib/java/src/ipaaca/OutputBuffer.java index 818621b07d24b974f7928e3d30fa59126caa5fd7..77f11a070a46b61c3fece8cb90a13caafbd20a14 100644 --- a/ipaacalib/java/src/ipaaca/OutputBuffer.java +++ b/ipaacalib/java/src/ipaaca/OutputBuffer.java @@ -1,9 +1,10 @@ + /* * This file is part of IPAACA, the * "Incremental Processing Architecture * for Artificial Conversational Agents". * - * Copyright (c) 2009-2013 Sociable Agents Group + * Copyright (c) 2009-2015 Social Cognitive Systems Group * CITEC, Bielefeld University * * http://opensource.cit-ec.de/projects/ipaaca/ @@ -34,6 +35,7 @@ package ipaaca; import ipaaca.protobuf.Ipaaca; import ipaaca.protobuf.Ipaaca.IUCommission; +import ipaaca.protobuf.Ipaaca.IURetraction; import ipaaca.protobuf.Ipaaca.IUResendRequest; import ipaaca.protobuf.Ipaaca.IULinkUpdate; import ipaaca.protobuf.Ipaaca.IUPayloadUpdate; @@ -474,7 +476,21 @@ public class OutputBuffer extends Buffer protected void sendIUCommission(AbstractIU iu, String writerName) { IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(iu.getUid()).setRevision(iu.getRevision()) - .setWriterName(iu.getOwnerName() != null ? iu.getOwnerName() : writerName).build(); + .setWriterName(writerName == null ? iu.getOwnerName() : writerName).build(); + Informer<Object> informer = getInformer(iu.getCategory()); + try + { + informer.send(iuc); + } + catch (RSBException e) + { + throw new RuntimeException(e); + } + } + + protected void sendIURetraction(AbstractIU iu) + { + IURetraction iuc = Ipaaca.IURetraction.newBuilder().setUid(iu.getUid()).setRevision(iu.getRevision()).build(); Informer<Object> informer = getInformer(iu.getCategory()); try { diff --git a/ipaacalib/java/src/ipaaca/RemoteMessageIU.java b/ipaacalib/java/src/ipaaca/RemoteMessageIU.java index db69496e6709d9c453c34e2a94cf275f3bd4bedb..a3898a170138a0618d97ed97f2d51d7a3458af6c 100644 --- a/ipaacalib/java/src/ipaaca/RemoteMessageIU.java +++ b/ipaacalib/java/src/ipaaca/RemoteMessageIU.java @@ -3,7 +3,7 @@ * "Incremental Processing Architecture * for Artificial Conversational Agents". * - * Copyright (c) 2009-2013 Sociable Agents Group + * Copyright (c) 2009-2015 Social Cognitive Systems Group * CITEC, Bielefeld University * * http://opensource.cit-ec.de/projects/ipaaca/ @@ -58,6 +58,12 @@ public class RemoteMessageIU extends AbstractIU committed = true; } + @Override + public void retract() + { + log.info("Retracting a RemoteMessage has no effect."); + } + @Override public void commit(String writerName) { diff --git a/ipaacalib/java/src/ipaaca/RemotePushIU.java b/ipaacalib/java/src/ipaaca/RemotePushIU.java index bfb8060403c42031dfbf70c7b8c55eadaf72b334..f58e83c39b820be89909f9a6b0213171602d6d9f 100644 --- a/ipaacalib/java/src/ipaaca/RemotePushIU.java +++ b/ipaacalib/java/src/ipaaca/RemotePushIU.java @@ -84,9 +84,9 @@ public class RemotePushIU extends AbstractIU } @Override - public void commit() + public void retract() { - commit(null); + logger.info("Retracting a RemoteIU has no effect."); } void putIntoPayload(String key, String value, String writer) @@ -95,6 +95,10 @@ public class RemotePushIU extends AbstractIU { throw new IUCommittedException(this); } + if (isRetracted()) + { + throw new IURetractedException(this); + } if (isReadOnly()) { throw new IUReadOnlyException(this); @@ -136,6 +140,10 @@ public class RemotePushIU extends AbstractIU { throw new IUCommittedException(this); } + if (isRetracted()) + { + throw new IURetractedException(this); + } if (isReadOnly()) { throw new IUReadOnlyException(this); @@ -179,9 +187,19 @@ public class RemotePushIU extends AbstractIU setRevision(newRevision); } + @Override + public void commit() + { + commit(null); + } + @Override public void commit(String writerName) { + if (isRetracted()) + { + throw new IURetractedException(this); + } if (isReadOnly()) { throw new IUReadOnlyException(this); @@ -254,6 +272,10 @@ public class RemotePushIU extends AbstractIU { throw new IUCommittedException(this); } + if (isRetracted()) + { + throw new IURetractedException(this); + } if (isReadOnly()) { throw new IUReadOnlyException(this); @@ -357,6 +379,11 @@ public class RemotePushIU extends AbstractIU committed = true; } + public void applyRetraction() + { + retracted = true; + } + @Override void removeFromPayload(Object key, String writer) { @@ -364,6 +391,10 @@ public class RemotePushIU extends AbstractIU { throw new IUCommittedException(this); } + if (isRetracted()) + { + throw new IURetractedException(this); + } if (isReadOnly()) { throw new IUReadOnlyException(this); @@ -403,6 +434,10 @@ public class RemotePushIU extends AbstractIU { throw new IUCommittedException(this); } + if (isRetracted()) + { + throw new IURetractedException(this); + } if (isReadOnly()) { throw new IUReadOnlyException(this);