Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • scs/ipaaca
  • ramin.yaghoubzadeh/ipaaca
2 results
Show changes
Showing
with 3559 additions and 30 deletions
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2013 Sociable Agents Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca;
/**
* Indicates that a remote update failed
* @author hvanwelbergen
*
*/
public class IUUpdateFailedException extends RuntimeException
{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU()
{
return iu;
}
public IUUpdateFailedException(AbstractIU iu)
{
super("Remote update failed for IU " + iu.getUid() + ".");
this.iu = iu;
}
}
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca;
import java.nio.ByteBuffer;
import ipaaca.protobuf.Ipaaca.IUCommission;
import ipaaca.protobuf.Ipaaca.IUResendRequest;
import ipaaca.protobuf.Ipaaca.IURetraction;
import rsb.converter.ConverterSignature;
import rsb.converter.ConverterRepository;
import rsb.converter.DefaultConverterRepository;
import rsb.converter.ProtocolBufferConverter;
/**
* Hooks up the ipaaca converters, call initializeIpaacaRsb() before using ipaaca.
* @author hvanwelbergen
*
*/
public final class Initializer {
private Initializer() {}
private static volatile boolean initialized = false;
public synchronized static void initializeIpaacaRsb() {
if (initialized)
return;
ConverterRepository<ByteBuffer> dcr =
DefaultConverterRepository.getDefaultConverterRepository();
// for IU revision numbers
dcr.addConverter(
new IntConverter());
// IU commit messages
dcr.addConverter(
new ProtocolBufferConverter<IUCommission>(
IUCommission.getDefaultInstance()));
// IU commit messages
dcr.addConverter(
new ProtocolBufferConverter<IURetraction>(
IURetraction.getDefaultInstance()));
// IU resend request messages
dcr.addConverter(
new ProtocolBufferConverter<IUResendRequest>(
IUResendRequest.getDefaultInstance()));
// IUs
dcr.addConverter(
new IUConverter(
new ConverterSignature(
"ipaaca-iu",
RemotePushIU.class)));
// Local IUs
dcr.addConverter(
new IUConverter(
new ConverterSignature(
"ipaaca-localiu",
LocalIU.class)));
// Messages
dcr.addConverter(
new IUConverter(
new ConverterSignature(
"ipaaca-messageiu",
RemoteMessageIU.class)));
// LocalMessages
dcr.addConverter(
new IUConverter(
new ConverterSignature(
"ipaaca-localmessageiu",
LocalMessageIU.class)));
// Payloads
dcr.addConverter(
new PayloadConverter());
// LinkUpdates
dcr.addConverter(
new LinkUpdateConverter());
initialized = true;
}
}
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2013 Sociable Agents Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca;
import ipaaca.protobuf.Ipaaca.IUCommission;
import ipaaca.protobuf.Ipaaca.IURetraction;
import ipaaca.protobuf.Ipaaca.IUResendRequest;
import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
import ipaaca.protobuf.Ipaaca.PayloadItem;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rsb.Event;
import rsb.Factory;
import rsb.Handler;
import rsb.InitializeException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import rsb.Listener;
import rsb.RSBException;
import rsb.Scope;
import rsb.patterns.RemoteServer;
/**
* An InputBuffer that holds remote IUs.
* @author hvanwelbergen
*/
@Slf4j
public class InputBuffer extends Buffer
{
private Map<String, RemoteServer> remoteServerStore = new HashMap<String, RemoteServer>();
private Map<String, Listener> listenerStore = new HashMap<String, Listener>();
private Set<String> categoryInterests = new HashSet<String>();
private final static Logger logger = LoggerFactory.getLogger(InputBuffer.class.getName());
private IUStore<RemotePushIU> iuStore = new IUStore<RemotePushIU>();
private IUStore<RemoteMessageIU> messageStore = new IUStore<RemoteMessageIU>();
private String channel = "default";
private boolean resendActive;
public void close()
{
for (Listener listener : listenerStore.values())
{
try
{
listener.deactivate();
}
catch (RSBException e)
{
log.warn("RSB Exception on deactive {}", e, listener.toString());
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
}
for (RemoteServer remServer : remoteServerStore.values())
{
try
{
remServer.deactivate();
}
catch (RSBException e)
{
log.warn("RSB Exception on RemoteServer deactivate {}", e, remServer.toString());
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
}
}
// def __init__(self, owning_component_name, category_interests=None, participant_config=None):
// '''Create an InputBuffer.
//
// Keyword arguments:
// owning_compontent_name -- name of the entity that owns this InputBuffer
// category_interests -- list of IU categories this Buffer is interested in
// participant_config = RSB configuration
// '''
// super(InputBuffer, self).__init__(owning_component_name, participant_config)
// self._unique_name = '/ipaaca/component/'+str(owning_component_name)+'ID'+self._uuid+'/IB'
// self._listener_store = {} # one per IU category
// self._remote_server_store = {} # one per remote-IU-owning Component
// self._category_interests = []
// if category_interests is not None:
// for cat in category_interests:
// self._create_category_listener_if_needed(cat)
public InputBuffer(String owningComponentName, Set<String> categoryInterests)
{
this(owningComponentName, categoryInterests, "default");
}
public InputBuffer(String owningComponentName, Set<String> categoryInterests, String ipaaca_channel)
{
super(owningComponentName);
resendActive = false;
String shortIDName = getUniqueShortName();
uniqueName = "/ipaaca/component/" + shortIDName + "/IB";
this.channel = ipaaca_channel;
for (String cat : categoryInterests)
{
createCategoryListenerIfNeeded(cat);
}
// add own uuid as identifier for hidden channel. (dlw)
createCategoryListenerIfNeeded(shortIDName);
}
/** Pass resendActive to toggle resendRequest-functionality. */
public InputBuffer(BufferConfiguration bufferconfiguration)
{
super(bufferconfiguration.getOwningComponentName());
this.resendActive = bufferconfiguration.getResendActive();
String shortIDName = getUniqueShortName();
uniqueName = "/ipaaca/component/" + shortIDName + "/IB";
for (String cat : bufferconfiguration.getCategoryInterests())
{
createCategoryListenerIfNeeded(cat);
}
this.channel = bufferconfiguration.getChannel();
// add own uuid as identifier for hidden channel. (dlw)
createCategoryListenerIfNeeded(shortIDName);
}
public boolean isResendActive() {
return this.resendActive;
}
public void setResendActive(boolean active) {
this.resendActive = active;
}
// def _get_remote_server(self, iu):
// '''Return (or create, store and return) a remote server.'''
// if iu.owner_name in self._remote_server_store:
// return self._remote_server_store[iu.owner_name]
// remote_server = rsb.createRemoteServer(rsb.Scope(str(iu.owner_name)))
// self._remote_server_store[iu.owner_name] = remote_server
// return remote_server
protected RemoteServer getRemoteServer(AbstractIU iu)
{
if (remoteServerStore.containsKey(iu.getOwnerName()))
{
return remoteServerStore.get(iu.getOwnerName());
}
logger.debug("Getting remote server for {}", iu.getOwnerName());
RemoteServer remoteServer = Factory.getInstance().createRemoteServer(new Scope(iu.getOwnerName()));
try
{
remoteServer.activate();
}
catch (InitializeException e)
{
throw new RuntimeException(e);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
remoteServerStore.put(iu.getOwnerName(), remoteServer);
return remoteServer;
}
protected RemoteServer getRemoteServer(String ownerName)
{
if (remoteServerStore.containsKey(ownerName))
{
return remoteServerStore.get(ownerName);
}
logger.debug("Getting remote server for {}", ownerName);
RemoteServer remoteServer = Factory.getInstance().createRemoteServer(new Scope(ownerName));
try
{
remoteServer.activate();
}
catch (InitializeException e)
{
throw new RuntimeException(e);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
remoteServerStore.put(ownerName, remoteServer);
return remoteServer;
}
// def _create_category_listener_if_needed(self, iu_category):
// '''Return (or create, store and return) a category listener.'''
// if iu_category in self._listener_store: return self._informer_store[iu_category]
// cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
// cat_listener.addHandler(self._handle_iu_events)
// self._listener_store[iu_category] = cat_listener
// self._category_interests.append(iu_category)
// logger.info("Added category listener for "+iu_category)
// return cat_listener
private Listener createCategoryListenerIfNeeded(String category)
{
if (listenerStore.containsKey(category))
{
return listenerStore.get(category);
}
Listener listener;
try
{
listener = Factory.getInstance().createListener(new Scope("/ipaaca/channel/" + this.channel + "/category/" + category));
}
catch (InitializeException e1)
{
throw new RuntimeException(e1);
}
listenerStore.put(category, listener);
categoryInterests.add(category);
try
{
listener.addHandler(new InputHandler(), true);
}
catch (InterruptedException e1)
{
Thread.currentThread().interrupt();
}
logger.info("Added category listener for {}", category);
try
{
listener.activate();
}
catch (InitializeException e)
{
throw new RuntimeException(e);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
return listener;
}
class InputHandler implements Handler
{
@Override
public void internalNotify(Event ev)
{
handleIUEvents(ev);
}
}
// def _handle_iu_events(self, event):
// '''Dispatch incoming IU events.
//
// Adds incoming IU's to the store, applies payload and commit updates to
// IU, calls IU event handlers.'
//
// Keyword arguments:
// event -- a converted RSB event
// '''
// if type(event.data) is RemotePushIU:
// # a new IU
// if event.data.uid in self._iu_store:
// # already in our store
// pass
// else:
// self._iu_store[ event.data.uid ] = event.data
// event.data.buffer = self
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.ADDED, category=event.data.category)
// else:
// # an update to an existing IU
// if event.data.writer_name == self.unique_name:
// # Discard updates that originate from this buffer
// return
// if event.data.uid not in self._iu_store:
// # TODO: we should request the IU's owner to send us the IU
// logger.warning("Update message for IU which we did not fully receive before.")
// return
// if type(event.data) is iuProtoBuf_pb2.IUCommission:
// # IU commit
// iu = self._iu_store[event.data.uid]
// iu._apply_commission()
// iu._revision = event.data.revision
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category)
// elif type(event.data) is IUPayloadUpdate:
// # IU payload update
// iu = self._iu_store[event.data.uid]
// iu._apply_update(event.data)
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category)
/**
* Dispatch incoming IU events.
*/
private void handleIUEvents(Event event)
{
if (event.getData() instanceof RemoteMessageIU)
{
RemoteMessageIU rm = (RemoteMessageIU) event.getData();
if (messageStore.containsKey(rm.getUid())) {
logger.warn("Spurious RemoteMessage event: already got this UID: "+rm.getUid());
return;
}
//logger.info("Adding Message "+rm.getUid());
messageStore.put(rm.getUid(), rm);
//logger.info("Calling handlers for Message "+rm.getUid());
callIuEventHandlers(rm.getUid(),false, IUEventType.MESSAGE, rm.getCategory());
//logger.info("Removing Message "+rm.getUid());
messageStore.remove(rm.getUid());
}
else if (event.getData() instanceof RemotePushIU)
{
RemotePushIU rp = (RemotePushIU) event.getData();
// a new IU
if (iuStore.containsKey(rp.getUid()))
{
// already in our store
return;
}
else
{
iuStore.put(rp.getUid(), rp);
rp.setBuffer(this);
this.callIuEventHandlers(rp.getUid(), false, IUEventType.ADDED, rp.getCategory());
}
}
else
{
if (event.getData() instanceof IULinkUpdate)
{
IULinkUpdate iuLinkUpdate = (IULinkUpdate) event.getData();
if (iuLinkUpdate.getWriterName().equals(this.getUniqueName()))
{
// Discard updates that originate from this buffer
return;
}
if (!iuStore.containsKey(iuLinkUpdate.getUid()))
{
if (resendActive)
{
triggerResendRequest(event.getData(), getUniqueShortName());
} else {
logger.warn("Link update message for IU which we did not fully receive before.");
}
return;
}
RemotePushIU iu = this.iuStore.get(iuLinkUpdate.getUid());
iu.applyLinkUpdate(iuLinkUpdate);
callIuEventHandlers(iu.getUid(), false, IUEventType.LINKSUPDATED, iu.category);
}
if (event.getData() instanceof IUPayloadUpdate)
{
IUPayloadUpdate iuUpdate = (IUPayloadUpdate) event.getData();
logger.debug("handleIUEvents invoked with an IUPayloadUpdate: {}", iuUpdate);
if (iuUpdate.getWriterName().equals(this.getUniqueName()))
{
// Discard updates that originate from this buffer
return;
}
if (!iuStore.containsKey(iuUpdate.getUid()))
{
if (resendActive)
{
triggerResendRequest(event.getData(), getUniqueShortName());
} else {
logger.warn("Update message for IU which we did not fully receive before.");
}
return;
}
RemotePushIU iu = this.iuStore.get(iuUpdate.getUid());
iu.applyUpdate(iuUpdate);
callIuEventHandlers(iu.getUid(), false, IUEventType.UPDATED, iu.category);
}
if (event.getData() instanceof IUCommission)
{
IUCommission iuc = (IUCommission) event.getData();
logger.debug("handleIUEvents invoked with an IUCommission: {}", iuc);
logger.debug("{}, {}", iuc.getWriterName(), this.getUniqueName());
if (iuc.getWriterName().equals(this.getUniqueName()))
{
// Discard updates that originate from this buffer
return;
}
if (!iuStore.containsKey(iuc.getUid()))
{
if (resendActive)
{
triggerResendRequest(event.getData(), getUniqueShortName());
} else {
logger.warn("Update message for IU which we did not fully receive before.");
}
return;
}
RemotePushIU iu = this.iuStore.get(iuc.getUid());
iu.applyCommmision();
iu.setRevision(iuc.getRevision());
callIuEventHandlers(iuc.getUid(), false, IUEventType.COMMITTED, iu.getCategory());
}
if (event.getData() instanceof IURetraction)
{
IURetraction iuc = (IURetraction) event.getData();
logger.debug("handleIUEvents invoked with an IURetraction: {}", iuc);
logger.debug("{}", this.getUniqueName());
if (!iuStore.containsKey(iuc.getUid()))
{
logger.warn("Update message for IU which we did not fully receive before.");
}
RemotePushIU iu = this.iuStore.get(iuc.getUid());
if (iu != null) {
iu.applyRetraction();
callIuEventHandlers(iuc.getUid(), false, IUEventType.RETRACTED, iu.getCategory());
}
}
}
}
private void triggerResendRequest(Object aiuObj, String hiddenScopeName)
{
String uid = null;
String writerName = null;
if (aiuObj instanceof IULinkUpdate) {
IULinkUpdate tmp = (IULinkUpdate)aiuObj;
uid = tmp.getUid();
writerName = tmp.getWriterName();
} else if (aiuObj instanceof IUPayloadUpdate) {
IUPayloadUpdate tmp = (IUPayloadUpdate)aiuObj;
uid = tmp.getUid();
writerName = tmp.getWriterName();
} else if (aiuObj instanceof IUCommission) {
IUCommission tmp = (IUCommission)aiuObj;
uid = tmp.getUid();
writerName = tmp.getWriterName();
}
RemoteServer rServer = null;
if (writerName != null)
rServer = getRemoteServer(writerName);
if ((rServer != null)&&(uid != null)) {
IUResendRequest iurr = IUResendRequest.newBuilder().setUid(uid).setHiddenScopeName(hiddenScopeName).build();
long rRevision = 0;
try
{
rRevision = (Long) rServer.call("resendRequest", iurr);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
catch (ExecutionException e)
{
throw new RuntimeException(e);
}
catch (TimeoutException e)
{
throw new RuntimeException(e);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (rRevision == 0)
{
//throw new IUResendFailedException(aiu); // TODO
}
}
}
public InputBuffer(String owningComponentName)
{
super(owningComponentName);
}
@Override
public AbstractIU getIU(String iuid)
{
if (iuStore.get(iuid) != null)
{
return iuStore.get(iuid);
}
else
{
return messageStore.get(iuid);
}
}
public void addCategoryInterest(String... categories)
{
for(String cat:categories)
{
createCategoryListenerIfNeeded(cat);
}
}
public Collection<RemotePushIU> getIUs()
{
return iuStore.values();
}
}
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2013 Sociable Agents Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca;
import ipaaca.Ipaaca.IntMessage;
import ipaaca.protobuf.Ipaaca.IntMessage;
import java.nio.ByteBuffer;
import rsb.converter.ConversionException;
import rsb.converter.Converter;
import rsb.converter.ConverterSignature;
......@@ -10,25 +44,27 @@ import rsb.converter.WireContents;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* Serializer/deserializer for ints
* @author hvanwelbergen
*
*/
public class IntConverter implements Converter<ByteBuffer>
{
@Override
public ConverterSignature getSignature()
{
return new ConverterSignature("int32",Integer.class);
return new ConverterSignature("int32", Integer.class);
}
@Override
public WireContents<ByteBuffer> serialize(Class<?> typeInfo, Object obj) throws ConversionException
{
Integer intVal = (Integer)obj;
IntMessage message = IntMessage.newBuilder()
.setValue(intVal)
.build();
return new WireContents<ByteBuffer>(ByteBuffer.wrap(message.toByteArray()),"int32");
Integer intVal = (Integer) obj;
IntMessage message = IntMessage.newBuilder().setValue(intVal).build();
return new WireContents<ByteBuffer>(ByteBuffer.wrap(message.toByteArray()), "int32");
}
@Override
......
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2013 Sociable Agents Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca;
import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import java.nio.ByteBuffer;
import rsb.converter.ConversionException;
import rsb.converter.Converter;
import rsb.converter.ConverterSignature;
import rsb.converter.UserData;
import rsb.converter.WireContents;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* Serializer/deserializer for IULinkUpdate
* @author hvanwelbergen
*
*/
public class LinkUpdateConverter implements Converter<ByteBuffer>
{
private static final String LINKUPDATE_WIRESCHEMA = "ipaaca-iu-link-update";
@Override
public UserData<?> deserialize(String wireSchema, ByteBuffer buffer) throws ConversionException
{
IULinkUpdate pl;
try
{
pl = IULinkUpdate.newBuilder().mergeFrom(buffer.array()).build();
}
catch (InvalidProtocolBufferException e)
{
throw new RuntimeException(e);
}
return new UserData<IULinkUpdate>(pl, IULinkUpdate.class);
}
@Override
public ConverterSignature getSignature()
{
return new ConverterSignature(LINKUPDATE_WIRESCHEMA, IULinkUpdate.class);
}
@Override
public WireContents<ByteBuffer> serialize(Class<?> typeInfo, Object obj) throws ConversionException
{
IULinkUpdate pl = (IULinkUpdate) obj;
return new WireContents<ByteBuffer>(ByteBuffer.wrap(pl.toByteArray()), LINKUPDATE_WIRESCHEMA);
}
}
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2013 Sociable Agents Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate.Builder;
import ipaaca.protobuf.Ipaaca.LinkSet;
import ipaaca.protobuf.Ipaaca.PayloadItem;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import com.google.common.collect.SetMultimap;
import ipaaca.Ipaaca.IULinkUpdate;
import ipaaca.Ipaaca.IUPayloadUpdate;
import ipaaca.Ipaaca.LinkSet;
import ipaaca.Ipaaca.PayloadItem;
public class LocalIU extends AbstractIU
{
public IU.AccessMode getAccessMode()
{
return IU.AccessMode.PUSH;
}
private OutputBuffer outputBuffer;
......@@ -32,16 +73,20 @@ public class LocalIU extends AbstractIU
public LocalIU()
{
this(null);
super(UUID.randomUUID().toString());
revision = 1;
payload = new Payload(this);
}
public LocalIU(String uid)
public LocalIU(String category)
{
super(uid);
super(UUID.randomUUID().toString());
this.category = category;
revision = 1;
payload = new Payload(this);
payload = new Payload(this);
}
// def _set_buffer(self, buffer):
// if self._buffer is not None:
// raise Exception('The IU is already in a buffer, cannot move it.')
......@@ -49,7 +94,7 @@ public class LocalIU extends AbstractIU
// self.owner_name = buffer.unique_name
// self._payload.owner_name = buffer.unique_name
//
public void setBuffer(OutputBuffer buffer)
protected void setBuffer(OutputBuffer buffer)
{
if (outputBuffer != null)
{
......@@ -66,6 +111,10 @@ public class LocalIU extends AbstractIU
synchronized (revisionLock)
{
if (isRetracted())
{
throw new IURetractedException(this);
}
if (committed)
{
throw new IUCommittedException(this);
......@@ -74,7 +123,26 @@ public class LocalIU extends AbstractIU
{
increaseRevisionNumber();
committed = true;
outputBuffer.sendIUCommission(this, writerName);
if (outputBuffer != null)
{
outputBuffer.sendIUCommission(this, writerName);
}
}
}
}
private void internalRetract()
{
synchronized (revisionLock)
{
if (isRetracted())
return;
increaseRevisionNumber();
retracted = true;
if (outputBuffer != null)
{
outputBuffer.sendIURetraction(this);
}
}
}
......@@ -106,6 +174,10 @@ public class LocalIU extends AbstractIU
@Override
void modifyLinks(boolean isDelta, SetMultimap<String, String> linksToAdd, SetMultimap<String, String> linksToRemove, String writerName)
{
if (isRetracted())
{
throw new IURetractedException(this);
}
if (isCommitted())
{
throw new IUCommittedException(this);
......@@ -113,7 +185,7 @@ public class LocalIU extends AbstractIU
synchronized (revisionLock)
{
increaseRevisionNumber();
if(isPublished())
if (isPublished())
{
String wName = null;
if (getBuffer() != null)
......@@ -128,24 +200,19 @@ public class LocalIU extends AbstractIU
{
wName = null;
}
Set<LinkSet> addSet = new HashSet<LinkSet>();
for(Entry<String, Collection<String>> entry :linksToAdd.asMap().entrySet())
Set<LinkSet> addSet = new HashSet<LinkSet>();
for (Entry<String, Collection<String>> entry : linksToAdd.asMap().entrySet())
{
addSet.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build());
}
Set<LinkSet> removeSet = new HashSet<LinkSet>();
for(Entry<String, Collection<String>> entry :linksToRemove.asMap().entrySet())
Set<LinkSet> removeSet = new HashSet<LinkSet>();
for (Entry<String, Collection<String>> entry : linksToRemove.asMap().entrySet())
{
removeSet.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build());
}
outputBuffer.sendIULinkUpdate(this,IULinkUpdate.newBuilder()
.setUid(getUid())
.setRevision(getRevision())
.setWriterName(wName)
.setIsDelta(isDelta)
.addAllNewLinks(addSet)
.addAllLinksToRemove(removeSet)
.build());
outputBuffer.sendIULinkUpdate(this,
IULinkUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setWriterName(wName).setIsDelta(isDelta)
.addAllNewLinks(addSet).addAllLinksToRemove(removeSet).build());
}
}
}
......@@ -189,31 +256,81 @@ public class LocalIU extends AbstractIU
{
throw new IUCommittedException(this);
}
if (isRetracted())
{
throw new IURetractedException(this);
}
increaseRevisionNumber();
if (isPublished())
{
// send update to remote holders
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("") // TODO: fix this, default in .proto?
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR")
.build();
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true)
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(true)
.setWriterName(writer == null ? getOwnerName() : writer).addNewItems(newItem).build();
getOutputBuffer().sendIUPayloadUpdate(this, update);
}
}
}
@Override
void putIntoPayload(Map<? extends String, ? extends String> newItems, String writer)
{
synchronized (getRevisionLock())
{
// set item locally
if (isCommitted())
{
throw new IUCommittedException(this);
}
if (isRetracted())
{
throw new IURetractedException(this);
}
increaseRevisionNumber();
if (isPublished())
{
Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(true)
.setWriterName(writer == null ? getOwnerName() : writer);
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{
PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("STR")
.build();
builder.addNewItems(newItem);
}
IUPayloadUpdate update = builder.build();
getOutputBuffer().sendIUPayloadUpdate(this, update);
}
}
}
@Override
public void commit()
{
if (isRetracted())
{
throw new IURetractedException(this);
}
internalCommit(null);
}
@Override
public void commit(String writerName)
{
if (isRetracted())
{
throw new IURetractedException(this);
}
internalCommit(writerName);
}
@Override
public void retract()
{
internalRetract();
}
@Override
void removeFromPayload(Object key, String writer)
{
......@@ -223,11 +340,15 @@ public class LocalIU extends AbstractIU
{
throw new IUCommittedException(this);
}
if (isRetracted())
{
throw new IURetractedException(this);
}
increaseRevisionNumber();
if (isPublished())
{
// send update to remote holders
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true)
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(true)
.setWriterName(writer == null ? getOwnerName() : writer).addKeysToRemove((String) key).build();
getOutputBuffer().sendIUPayloadUpdate(this, update);
}
......@@ -240,10 +361,15 @@ public class LocalIU extends AbstractIU
{
if (isPublished())
{
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(false)
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(false)
.setWriterName(writerName == null ? getOwnerName() : writerName).addAllNewItems(newPayload).build();
getOutputBuffer().sendIUPayloadUpdate(this, update);
}
}
@Override
public String toString()
{
return "LocalIU with category: "+this.getCategory() + "\nowner: "+getOwnerName()+"\npayload: "+this.getPayload();
}
}
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
/**
* Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls.
* @author hvanwelbergen
*/
public class LocalMessageIU extends LocalIU
{
public LocalMessageIU()
{
super();
}
public LocalMessageIU(String category)
{
super(category);
}
public IU.AccessMode getAccessMode()
{
return IU.AccessMode.MESSAGE;
}
}
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca;
import ipaaca.protobuf.Ipaaca;
import ipaaca.protobuf.Ipaaca.IUCommission;
import ipaaca.protobuf.Ipaaca.IURetraction;
import ipaaca.protobuf.Ipaaca.IUResendRequest;
import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
import ipaaca.protobuf.Ipaaca.LinkSet;
import ipaaca.protobuf.Ipaaca.PayloadItem;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rsb.Factory;
import rsb.Informer;
import rsb.InitializeException;
import rsb.RSBException;
import rsb.patterns.DataCallback;
import rsb.patterns.EventCallback;
import rsb.patterns.LocalServer;
import rsb.Event;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
/**
* An OutputBuffer that holds local IUs.
* @author hvanwelbergen
*/
@Slf4j
public class OutputBuffer extends Buffer
{
private final LocalServer server;
private Map<String, Informer<Object>> informerStore = new HashMap<String, Informer<Object>>(); // category -> informer map
private final static Logger logger = LoggerFactory.getLogger(OutputBuffer.class.getName());
private IUStore<LocalIU> iuStore = new IUStore<LocalIU>();
private String channel = "default";
// def __init__(self, owning_component_name, participant_config=None):
// '''Create an Output Buffer.
//
// Keyword arguments:
// owning_component_name -- name of the entity that own this buffer
// participant_config -- RSB configuration
// '''
// super(OutputBuffer, self).__init__(owning_component_name, participant_config)
// self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB'
// self._server = rsb.createServer(rsb.Scope(self._unique_name))
// self._server.addMethod('updatePayload', self._remote_update_payload, IUPayloadUpdate, int)
// self._server.addMethod('commit', self._remote_commit, iuProtoBuf_pb2.IUCommission, int)
// self._informer_store = {}
// self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-'
// self.__iu_id_counter_lock = threading.Lock()
// self.__iu_id_counter = 0
/**
* @param owningComponentName name of the entity that own this buffer
*/
public OutputBuffer(String owningComponentName)
{
this(owningComponentName, "default");
}
/**
* @param owningComponentName name of the entity that own this buffer
* @param channel name of the ipaaca channel this buffer is using
*/
public OutputBuffer(String owningComponentName, String ipaaca_channel)
{
super(owningComponentName);
uniqueName = "/ipaaca/component/" + getUniqueShortName() + "/OB";
logger.debug("Creating server for {}", uniqueName);
server = Factory.getInstance().createLocalServer(uniqueName);
try
{
server.addMethod("updatePayload", new RemoteUpdatePayload());
server.addMethod("updateLinks", new RemoteUpdateLinks());
server.addMethod("commit", new RemoteCommit());
// add method to trigger a resend request. (dlw)
server.addMethod("resendRequest", new RemoteResendRequest());
server.activate();
}
catch (InitializeException e)
{
throw new RuntimeException(e);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
this.channel = ipaaca_channel;
}
private final class RemoteUpdatePayload extends EventCallback //DataCallback<Long, IUPayloadUpdate>
{
@Override
public Event invoke(final Event request) //throws Throwable
{
logger.debug("remoteUpdate");
long result = remoteUpdatePayload((IUPayloadUpdate) request.getData());
//System.out.println("remoteUpdatePayload yielded revision "+result);
return new Event(Long.class, new Long(result));
}
}
/*private final class RemoteUpdatePayload extends DataCallback<Long, IUPayloadUpdate>
{
@Override
public Long invoke(IUPayloadUpdate data) throws Throwable
{
logger.debug("remoteUpdate");
return remoteUpdatePayload(data);
}
}*/
private final class RemoteUpdateLinks extends EventCallback // DataCallback<Long, IULinkUpdate>
{
@Override
public Event invoke(final Event request) //throws Throwable
{
logger.debug("remoteUpdateLinks");
return new Event(Long.class, new Long(remoteUpdateLinks((IULinkUpdate) request.getData())));
}
}
private final class RemoteCommit extends EventCallback //DataCallback<Long, IUCommission>
{
@Override
public Event invoke(final Event request) //throws Throwable
{
logger.debug("remoteCommit");
return new Event(Long.class, new Long(remoteCommit((IUCommission) request.getData())));
}
}
private final class RemoteResendRequest extends EventCallback //DataCallback<Long, IUResendRequest>
{
@Override
public Event invoke(final Event request) //throws Throwable
{
logger.debug("remoteResendRequest");
return new Event(Long.class, new Long(remoteResendRequest((IUResendRequest) request.getData())));
}
}
// def _remote_update_payload(self, update):
// '''Apply a remotely requested update to one of the stored IUs.'''
// if update.uid not in self._iu_store:
// logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
// return 0
// iu = self._iu_store[update.uid]
// if (update.revision != 0) and (update.revision != iu.revision):
// # (0 means "do not pay attention to the revision number" -> "force update")
// logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
// return 0
// if update.is_delta:
// for k in update.keys_to_remove:
// iu.payload.__delitem__(k, writer_name=update.writer_name)
// for k,v in update.new_items.items():
// iu.payload.__setitem__(k, v, writer_name=update.writer_name)
// else:
// iu._set_payload(update.new_items, writer_name=update.writer_name)
// self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category)
// return iu.revision
/**
* Apply a remotely requested update to one of the stored IUs.
* @return 0 if not updated, IU version number otherwise
*/
long remoteUpdatePayload(IUPayloadUpdate update)
{
if (!iuStore.containsKey(update.getUid()))
{
logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}", update.getUid());
return 0;
}
AbstractIU iu = iuStore.get(update.getUid());
if (update.getRevision() != 0 && update.getRevision() != iu.getRevision())
{
// (0 means "do not pay attention to the revision number" -> "force update")
logger.warn("Remote write operation failed because request was out of date; IU {}", update.getUid());
return 0;
}
if (update.getIsDelta())
{
for (String k : update.getKeysToRemoveList())
{
iu.getPayload().remove(k, update.getWriterName());
}
if (update.getNewItemsList().size() > 0)
{
HashMap<String, String> payloadUpdate = new HashMap<String, String>();
for (PayloadItem pli : update.getNewItemsList())
{
payloadUpdate.put(pli.getKey(), pli.getValue());
// //iu.getPayload().put(pli.getKey(), pli.getValue(), update.getWriterName());
}
iu.getPayload().putAll(payloadUpdate, update.getWriterName());
}
}
else
{
iu.setPayload(update.getNewItemsList(), update.getWriterName());
}
callIuEventHandlers(update.getUid(), true, IUEventType.UPDATED, iu.getCategory());
return iu.revision;
}
/**
* Apply a remotely requested update to one of the stored IUs.
* @return 0 if not updated, IU version number otherwise
*/
long remoteUpdateLinks(IULinkUpdate update)
{
if (!iuStore.containsKey(update.getUid()))
{
logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}", update.getUid());
return 0;
}
AbstractIU iu = iuStore.get(update.getUid());
if (update.getRevision() != 0 && update.getRevision() != iu.getRevision())
{
// (0 means "do not pay attention to the revision number" -> "force update")
logger.warn("Remote write operation failed because request was out of date; IU {}", update.getUid());
return 0;
}
if (update.getIsDelta())
{
SetMultimap<String, String> newLinks = HashMultimap.create();
for (LinkSet ls : update.getNewLinksList())
{
newLinks.putAll(ls.getType(), ls.getTargetsList());
}
SetMultimap<String, String> removeLinks = HashMultimap.create();
for (LinkSet ls : update.getLinksToRemoveList())
{
removeLinks.putAll(ls.getType(), ls.getTargetsList());
}
iu.modifyLinks(newLinks, removeLinks);
}
else
{
SetMultimap<String, String> newLinks = HashMultimap.create();
for (LinkSet ls : update.getNewLinksList())
{
newLinks.putAll(ls.getType(), ls.getTargetsList());
}
iu.setLinks(newLinks);
}
callIuEventHandlers(update.getUid(), true, IUEventType.LINKSUPDATED, iu.getCategory());
return iu.revision;
}
//
// def _remote_commit(self, iu_commission):
// '''Apply a remotely requested commit to one of the stored IUs.'''
// if iu_commission.uid not in self._iu_store:
// logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid))
// return 0
// iu = self._iu_store[iu_commission.uid]
// if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision):
// # (0 means "do not pay attention to the revision number" -> "force update")
// logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
// return 0
// if iu.committed:
// return 0
// else:
// iu._internal_commit(writer_name=iu_commission.writer_name)
// self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=IUEventType.COMMITTED, category=iu.category)
// return iu.revision
/**
* Apply a remotely requested commit to one of the stored IUs.
*/
private long remoteCommit(IUCommission iuc)
{
if (!iuStore.containsKey(iuc.getUid()))
{
logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}", iuc.getUid());
return 0;
}
AbstractIU iu = iuStore.get(iuc.getUid());
if (iuc.getRevision() != 0 && iuc.getRevision() != iu.getRevision())
{
// (0 means "do not pay attention to the revision number" -> "force update")
logger.warn("Remote write operation failed because request was out of date; IU {}", iuc.getUid());
return 0;
}
if (iu.isCommitted())
{
return 0;
}
else
{
iu.commit(iuc.getWriterName());
callIuEventHandlers(iuc.getUid(), true, IUEventType.COMMITTED, iu.getCategory());
return iu.getRevision();
}
}
/*
* Resend an requested iu over the specific hidden channel. (dlw) TODO
*/
private long remoteResendRequest(IUResendRequest iu_resend_request_pack)
{
if (!iuStore.containsKey(iu_resend_request_pack.getUid()))
{
logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}", iu_resend_request_pack.getUid());
return 0;
}
AbstractIU iu = iuStore.get(iu_resend_request_pack.getUid());
if ((iu_resend_request_pack.hasHiddenScopeName() == true)&&(!iu_resend_request_pack.getHiddenScopeName().equals("")))
{
Informer<Object> informer = getInformer(iu_resend_request_pack.getHiddenScopeName());
try
{
informer.publish(iu);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
return iu.getRevision();
} else
{
return 0;
}
}
// def _get_informer(self, iu_category):
// '''Return (or create, store and return) an informer object for IUs of the specified category.'''
// if iu_category in self._informer_store:
// return self._informer_store[iu_category]
// informer_iu = rsb.createInformer(
// rsb.Scope("/ipaaca/category/"+str(iu_category)),
// config=self._participant_config,
// dataType=object)
// self._informer_store[iu_category] = informer_iu #new_tuple
// logger.info("Added informer on "+iu_category)
// return informer_iu #return new_tuple
/**
* Return (or create, store and return) an informer object for IUs of the specified category.
*/
private Informer<Object> getInformer(String category)
{
if (informerStore.containsKey(category))
{
return informerStore.get(category);
}
Informer<Object> informer;
try
{
informer = Factory.getInstance().createInformer("/ipaaca/channel/" + this.channel + "/category/" + category);
}
catch (InitializeException e1)
{
throw new RuntimeException(e1);
}
informerStore.put(category, informer);
logger.info("Added informer on channel " + this.channel + " and category " + category);
try
{
informer.activate();
}
catch (InitializeException e)
{
throw new RuntimeException(e);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
return informer;
}
// def add(self, iu):
// '''Add an IU to the IU store, assign an ID and publish it.'''
// if iu._uid is not None:
// raise IUPublishedError(iu)
// iu.uid = self._generate_iu_uid()
// self._iu_store[iu._uid] = iu
// iu.buffer = self
// self._publish_iu(iu)
/**
* Add an IU to the IU store, assign an ID and publish it.
*/
public void add(LocalIU iu)
{
if (iuStore.get(iu.getUid()) != null)
{
throw new IUPublishedException(iu);
}
if(!(iu instanceof LocalMessageIU))
{
iuStore.put(iu.getUid(), iu);
}
iu.setBuffer(this);
publishIU(iu);
}
// def _publish_iu(self, iu):
// '''Publish an IU.'''
// informer = self._get_informer(iu._category)
// informer.publishData(iu)
private void publishIU(AbstractIU iu)
{
Informer<Object> informer = getInformer(iu.getCategory());
try
{
informer.publish(iu);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
}
// def _send_iu_commission(self, iu, writer_name):
// '''Send IU commission.
//
// Keyword arguments:
// iu -- the IU that has been committed to
// writer_name -- name of the Buffer that initiated this commit, necessary
// to enable remote components to filter out updates that originated
// from their own operations
// '''
// # a raw Protobuf object for IUCommission is produced
// # (unlike updates, where we have an intermediate class)
// iu_commission = iuProtoBuf_pb2.IUCommission()
// iu_commission.uid = iu.uid
// iu_commission.revision = iu.revision
// iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name
// # print('sending IU commission event')
// informer = self._get_informer(iu._category)
// informer.publishData(iu_commission)
/**
* @param iu the IU that has been committed to
* @param writerName name of the Buffer that initiated this commit, necessary
* to enable remote components to filter out updates that originated
* from their own operations
*/
protected void sendIUCommission(AbstractIU iu, String writerName)
{
IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(iu.getUid()).setRevision((int) iu.getRevision())
.setWriterName(writerName == null ? iu.getOwnerName() : writerName).build();
Informer<Object> informer = getInformer(iu.getCategory());
try
{
informer.publish(iuc);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
}
protected void sendIURetraction(AbstractIU iu)
{
IURetraction iuc = Ipaaca.IURetraction.newBuilder().setUid(iu.getUid()).setRevision((int) iu.getRevision()).build();
Informer<Object> informer = getInformer(iu.getCategory());
try
{
informer.publish(iuc);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
}
// def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"):
// '''Send an IU payload update.
//
// Keyword arguments:
// iu -- the IU being updated
// is_delta -- whether the update concerns only a single payload item or
// the whole payload dictionary
// revision -- the new revision number
// new_items -- a dictionary of new payload items
// keys_to_remove -- a list of the keys that shall be removed from the
// payload
// writer_name -- name of the Buffer that initiated this update, necessary
// to enable remote components to filter out updates that originate d
// from their own operations
// '''
// if new_items is None:
// new_items = {}
// if keys_to_remove is None:
// keys_to_remove = []
// payload_update = IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision)
// payload_update.new_items = new_items
// if is_delta:
// payload_update.keys_to_remove = keys_to_remove
// payload_update.writer_name = writer_name
// informer = self._get_informer(iu._category)
// informer.publishData(payload_update)
protected void sendIUPayloadUpdate(AbstractIU iu, IUPayloadUpdate update)
{
Informer<Object> informer = getInformer(iu.getCategory());
try
{
informer.publish(update);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
}
protected void sendIULinkUpdate(AbstractIU iu, IULinkUpdate update)
{
Informer<Object> informer = getInformer(iu.getCategory());
try
{
informer.publish(update);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
}
@Override
public AbstractIU getIU(String iuid)
{
return iuStore.get(iuid);
}
public void close()
{
try
{
server.deactivate();
}
catch (RSBException e)
{
log.warn("RSBException on deactivating server in close", e);
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
for (Informer<?> informer : informerStore.values())
{
try
{
informer.deactivate();
}
catch (RSBException e)
{
log.warn("RSBException on deactivating informer {} in close", e, informer.toString());
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
}
}
}
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2013 Sociable Agents Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca;
import ipaaca.Ipaaca.PayloadItem;
import ipaaca.protobuf.Ipaaca.PayloadItem;
import org.apache.commons.lang.StringEscapeUtils;
import com.google.common.collect.ImmutableSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -15,76 +52,78 @@ import java.util.Set;
*/
public class Payload implements Map<String, String>
{
private Map<String, String> map = new HashMap<String, String>();
private Map<String, String> map = Collections.synchronizedMap(new HashMap<String, String>());
private final AbstractIU iu;
public Payload(AbstractIU iu)
{
this.iu = iu;
}
// def __init__(self, remote_push_iu, new_payload):
// """Create remote payload object.
//
// Keyword arguments:
// remote_push_iu -- remote IU holding this payload
// new_payload -- payload dict to initialise this remote payload with
// """
// super(RemotePushPayload, self).__init__()
// self._remote_push_iu = remote_push_iu
// if new_payload is not None:
// for k,v in new_payload.items():
// dict.__setitem__(self, k, v)
public Payload(AbstractIU iu, List<PayloadItem> payloadItems)
{
this(iu,payloadItems,null);
this(iu, payloadItems, null);
}
public Payload(AbstractIU iu, Map<String,String> newPayload)
public Payload(AbstractIU iu, Map<String, String> newPayload)
{
this(iu,newPayload,null);
this(iu, newPayload, null);
}
public Payload(AbstractIU iu, Map<String,String> newPayload, String writerName)
public Payload(AbstractIU iu, Map<String, String> newPayload, String writerName)
{
this.iu = iu;
set(newPayload, writerName);
}
public Payload(AbstractIU iu, List<PayloadItem>newPayload, String writerName)
public Payload(AbstractIU iu, List<PayloadItem> newPayload, String writerName)
{
this.iu = iu;
set(newPayload,writerName);
set(newPayload, writerName);
}
public void set(Map<String,String> newPayload, String writerName)
public void set(Map<String, String> newPayload, String writerName)
{
iu.setPayload(newPayload, writerName);
map.clear();
map.putAll(newPayload);
synchronized(map)
{
map.clear();
map.putAll(newPayload);
}
}
public void set(List<PayloadItem>newPayload, String writerName)
public void set(List<PayloadItem> newPayload, String writerName)
{
iu.handlePayloadSetting(newPayload,writerName);
map.clear();
for (PayloadItem item : newPayload)
iu.handlePayloadSetting(newPayload, writerName);
synchronized(map)
{
map.put(item.getKey(), item.getValue());
map.clear();
for (PayloadItem item : newPayload)
{
map.put(item.getKey(), pseudoConvertFromJSON(item.getValue(), item.getType()));
}
}
}
// def _remotely_enforced_setitem(self, k, v):
// """Sets an item when requested remotely."""
// return dict.__setitem__(self, k, v)
public String pseudoConvertFromJSON(String value, String type) {
if (type.equals("JSON")) {
if (value.startsWith("\"")) {
//return value.replaceAll("\\\"", "");
return StringEscapeUtils.unescapeJava(value.substring(1, value.length() - 1));
} else if (value.startsWith("{") || value.startsWith("[") || value.matches("true") || value.matches("false") || value.matches("-?[0-9]*[.,]?[0-9][0-9]*.*")) {
return value;
} else if (value.equals("null")) {
return "";
}
}
return value;
}
void enforcedSetItem(String key, String value)
{
map.put(key, value);
}
// def _remotely_enforced_delitem(self, k):
// """Deletes an item when requested remotely."""
// return dict.__delitem__(self, k)
void enforcedRemoveItem(String key)
{
map.remove(key);
......@@ -106,9 +145,12 @@ public class Payload implements Map<String, String>
return map.containsValue(value);
}
public Set<java.util.Map.Entry<String, String>> entrySet()
/**
* Provides an immutable copy of the entryset of the Payload
*/
public ImmutableSet<java.util.Map.Entry<String, String>> entrySet()
{
return map.entrySet();
return ImmutableSet.copyOf(map.entrySet());
}
public boolean equals(Object o)
......@@ -136,31 +178,6 @@ public class Payload implements Map<String, String>
return map.keySet();
}
// def __setitem__(self, k, v):
// """Set item in this payload.
//
// Requests item setting from the OutputBuffer holding the local version
// of this IU. Returns when permission is granted and item is set;
// otherwise raises an IUUpdateFailedError.
// """
// if self._remote_push_iu.committed:
// raise IUCommittedError(self._remote_push_iu)
// if self._remote_push_iu.read_only:
// raise IUReadOnlyError(self._remote_push_iu)
// requested_update = IUPayloadUpdate(
// uid=self._remote_push_iu.uid,
// revision=self._remote_push_iu.revision,
// is_delta=True,
// writer_name=self._remote_push_iu.buffer.unique_name,
// new_items={k:v},
// keys_to_remove=[])
// remote_server = self._remote_push_iu.buffer._get_remote_server(self._remote_push_iu)
// new_revision = remote_server.updatePayload(requested_update)
// if new_revision == 0:
// raise IUUpdateFailedError(self._remote_push_iu)
// else:
// self._remote_push_iu._revision = new_revision
// dict.__setitem__(self, k, v)
/**
* Set item in this payload.
* Requests item setting from the OutputBuffer holding the local version
......@@ -173,38 +190,12 @@ public class Payload implements Map<String, String>
return map.put(key, value);
}
//
// def __delitem__(self, k):
// """Delete item in this payload.
//
// Requests item deletion from the OutputBuffer holding the local version
// of this IU. Returns when permission is granted and item is deleted;
// otherwise raises an IUUpdateFailedError.
// """
// if self._remote_push_iu.committed:
// raise IUCommittedError(self._remote_push_iu)
// if self._remote_push_iu.read_only:
// raise IUReadOnlyError(self._remote_push_iu)
// requested_update = IUPayloadUpdate(
// uid=self._remote_push_iu.uid,
// revision=self._remote_push_iu.revision,
// is_delta=True,
// writer_name=self._remote_push_iu.buffer.unique_name,
// new_items={},
// keys_to_remove=[k])
// remote_server = self._remote_push_iu.buffer._get_remote_server(self._remote_push_iu)
// new_revision = remote_server.updatePayload(requested_update)
// if new_revision == 0:
// raise IUUpdateFailedError(self._remote_push_iu)
// else:
// self._remote_push_iu._revision = new_revision
// dict.__delitem__(self, k)
/**
* Delete item in this payload.//
* Requests item deletion from the OutputBuffer holding the local version
* of this IU. Returns when permission is granted and item is deleted;
* otherwise raises an IUUpdateFailedError.
*/
*/
public String remove(Object key, String writer)
{
iu.removeFromPayload(key, writer);
......@@ -216,9 +207,20 @@ public class Payload implements Map<String, String>
return put(key, value, null);
}
public void putAll(Map<? extends String, ? extends String> m)
public void putAll(Map<? extends String, ? extends String> newItems)
{
putAll(newItems, null);
}
public void putAll(Map<? extends String, ? extends String> newItems, String writer)
{
throw new RuntimeException("Not implemented");
iu.putIntoPayload(newItems, writer);
map.putAll(newItems);
}
public void merge(Map<? extends String, ? extends String> items) {
putAll(items, null);
}
public String remove(Object key)
......@@ -235,4 +237,10 @@ public class Payload implements Map<String, String>
{
return map.values();
}
@Override
public String toString()
{
return map.toString();
}
}
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2013 Sociable Agents Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
import java.nio.ByteBuffer;
import rsb.converter.ConversionException;
import rsb.converter.Converter;
import rsb.converter.ConverterSignature;
import rsb.converter.UserData;
import rsb.converter.WireContents;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* Serializer/deserializer for IUPayloadUpdate
* @author hvanwelbergen
*
*/
public class PayloadConverter implements Converter<ByteBuffer>
{
private static final String PAYLOAD_WIRESCHEMA = "ipaaca-iu-payload-update";
@Override
public UserData<?> deserialize(String wireSchema, ByteBuffer buffer) throws ConversionException
{
IUPayloadUpdate pl;
try
{
pl = IUPayloadUpdate.newBuilder().mergeFrom(buffer.array()).build();
}
catch (InvalidProtocolBufferException e)
{
throw new RuntimeException(e);
}
return new UserData<IUPayloadUpdate>(pl, IUPayloadUpdate.class);
}
@Override
public ConverterSignature getSignature()
{
return new ConverterSignature(PAYLOAD_WIRESCHEMA, IUPayloadUpdate.class);
}
@Override
public WireContents<ByteBuffer> serialize(Class<?> typeInfo, Object obj) throws ConversionException
{
IUPayloadUpdate pl = (IUPayloadUpdate) obj;
return new WireContents<ByteBuffer>(ByteBuffer.wrap(pl.toByteArray()), PAYLOAD_WIRESCHEMA);
}
}
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.PayloadItem;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import com.google.common.collect.SetMultimap;
@Slf4j
public class RemoteMessageIU extends AbstractIU
{
public IU.AccessMode getAccessMode()
{
return IU.AccessMode.MESSAGE;
}
public RemoteMessageIU(String uid)
{
super(uid);
payload = new Payload(this);
}
@Override
public void commit()
{
log.info("Info: committing to a RemoteMessage only has local effects");
committed = true;
}
@Override
public void retract()
{
log.info("Retracting a RemoteMessage has no effect.");
}
@Override
public void commit(String writerName)
{
log.info("Info: committing to a RemoteMessage only has local effects");
committed = true;
}
@Override
void setPayload(List<PayloadItem> newItems, String writerName)
{
for(PayloadItem item:newItems)
{
payload.put(item.getKey(),item.getValue());
}
log.info("Info: modifying a RemoteMessage only has local effects");
}
@Override
void putIntoPayload(String key, String value, String writer)
{
payload.put(key,value);
log.info("Info: modifying a RemoteMessage only has local effects");
}
void putIntoPayload(Map<? extends String, ? extends String> newItems, String writer) {
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{
payload.put(item.getKey(), item.getValue());
//System.out.println(entry.getKey() + "/" + entry.getValue());
}
log.info("Info: modifying a RemoteMessage only has local effects");
}
@Override
void removeFromPayload(Object key, String writer)
{
payload.remove(key);
log.info("Info: modifying a RemoteMessage only has local effects");
}
@Override
void handlePayloadSetting(List<PayloadItem> newPayload, String writerName)
{
}
@Override
void modifyLinks(boolean isDelta, SetMultimap<String, String> linksToAdd, SetMultimap<String, String> linksToRemove, String Writer)
{
log.info("Info: modifying a RemoteMessage only has local effects");
}
}
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2013 Sociable Agents Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca;
import ipaaca.protobuf.Ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.IUCommission;
import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
import ipaaca.protobuf.Ipaaca.LinkSet;
import ipaaca.protobuf.Ipaaca.PayloadItem;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate.Builder;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.SetMultimap;
import rsb.RSBException;
import rsb.patterns.RemoteServer;
import ipaaca.Ipaaca;
import ipaaca.Ipaaca.IUCommission;
import ipaaca.Ipaaca.IULinkUpdate;
import ipaaca.Ipaaca.IUPayloadUpdate;
import ipaaca.Ipaaca.LinkSet;
import ipaaca.Ipaaca.PayloadItem;
import com.google.common.collect.SetMultimap;
/**
* A remote IU with access mode 'PUSH'.
......@@ -29,19 +67,16 @@ public class RemotePushIU extends AbstractIU
private final static Logger logger = LoggerFactory.getLogger(RemotePushIU.class.getName());
private InputBuffer inputBuffer;
public IU.AccessMode getAccessMode()
{
return IU.AccessMode.PUSH;
}
public InputBuffer getInputBuffer()
{
return inputBuffer;
}
// def __init__(self, uid, revision, read_only, owner_name, category, type, committed, payload):
// super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only)
// self._revision = revision
// self._category = category
// self.owner_name = owner_name
// self._type = type
// self._committed = committed
// self._payload = RemotePushPayload(remote_push_iu=self, new_payload=payload)
public RemotePushIU(String uid)
{
super(uid);
......@@ -55,9 +90,9 @@ public class RemotePushIU extends AbstractIU
}
@Override
public void commit()
public void retract()
{
commit(null);
logger.info("Retracting a RemoteIU has no effect.");
}
void putIntoPayload(String key, String value, String writer)
......@@ -66,25 +101,43 @@ public class RemotePushIU extends AbstractIU
{
throw new IUCommittedException(this);
}
if (isRetracted())
{
throw new IURetractedException(this);
}
if (isReadOnly())
{
throw new IUReadOnlyException(this);
}
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("").build();// TODO use default type in .proto
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision(getRevision())
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR").build();
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision((int) getRevision())
.setWriterName(getBuffer().getUniqueName()).addNewItems(newItem).build();
RemoteServer server = getInputBuffer().getRemoteServer(this);
logger.debug("Remote server has methods {}", server.getMethods());
int newRevision;
long newRevision;
try
{
newRevision = (Integer) server.call("updatePayload", update);
//System.out.println("calling remote updatePayload ...");
newRevision = (Long) server.call("updatePayload", update);
//System.out.println(" ... done");
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
catch (ExecutionException e)
{
throw new RuntimeException(e);
}
catch (TimeoutException e)
{
throw new RuntimeException(e);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUUpdateFailedException(this);
......@@ -92,28 +145,77 @@ public class RemotePushIU extends AbstractIU
setRevision(newRevision);
}
// def commit(self):
// """Commit to this IU."""
// if self.read_only:
// raise IUReadOnlyError(self)
// if self._committed:
// # ignore commit requests when already committed
// return
// else:
// commission_request = iuProtoBuf_pb2.IUCommission()
// commission_request.uid = self.uid
// commission_request.revision = self.revision
// commission_request.writer_name = self.buffer.unique_name
// remote_server = self.buffer._get_remote_server(self)
// new_revision = remote_server.commit(commission_request)
// if new_revision == 0:
// raise IUUpdateFailedError(self)
// else:
// self._revision = new_revision
// self._committed = True
@Override
void putIntoPayload(Map<? extends String, ? extends String> newItems, String writer)
{
if (isCommitted())
{
throw new IUCommittedException(this);
}
if (isRetracted())
{
throw new IURetractedException(this);
}
if (isReadOnly())
{
throw new IUReadOnlyException(this);
}
Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(true)
.setWriterName(getBuffer().getUniqueName());
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{
PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("STR")
.build();
builder.addNewItems(newItem);
}
IUPayloadUpdate update = builder.build();
RemoteServer server = getInputBuffer().getRemoteServer(this);
logger.debug("Remote server has methods {}", server.getMethods());
long newRevision;
try
{
newRevision = (Long) server.call("updatePayload", update);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
catch (ExecutionException e)
{
throw new RuntimeException(e);
}
catch (TimeoutException e)
{
throw new RuntimeException(e);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUUpdateFailedException(this);
}
System.err.print("************************ ");
System.err.println(newRevision);
setRevision(newRevision);
}
@Override
public void commit()
{
commit(null);
}
@Override
public void commit(String writerName)
{
if (isRetracted())
{
throw new IURetractedException(this);
}
if (isReadOnly())
{
throw new IUReadOnlyException(this);
......@@ -124,18 +226,30 @@ public class RemotePushIU extends AbstractIU
}
else
{
IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(getUid()).setRevision(getRevision())
IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(getUid()).setRevision((int) getRevision())
.setWriterName(getBuffer().getUniqueName()).build();
RemoteServer server = inputBuffer.getRemoteServer(this);
int newRevision;
long newRevision;
try
{
newRevision = (Integer) server.call("commit", iuc);
newRevision = (Long) server.call("commit", iuc);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
catch (ExecutionException e)
{
throw new RuntimeException(e);
}
catch (TimeoutException e)
{
throw new RuntimeException(e);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUCommittedException(this);
......@@ -148,17 +262,6 @@ public class RemotePushIU extends AbstractIU
}
}
// def __str__(self):
// s = "RemotePushIU{ "
// s += "uid="+self._uid+" "
// s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") "
// s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " "
// s += "payload={ "
// for k,v in self.payload.items():
// s += k+":'"+v+"', "
// s += "} "
// s += "}"
// return s
@Override
public String toString()
{
......@@ -177,37 +280,11 @@ public class RemotePushIU extends AbstractIU
return b.toString();
}
//
// def _get_payload(self):
// return self._payload
public Payload getPayload()
{
return payload;
}
// def _set_payload(self, new_pl):
// if self.committed:
// raise IUCommittedError(self)
// if self.read_only:
// raise IUReadOnlyError(self)
// requested_update = IUPayloadUpdate(
// uid=self.uid,
// revision=self.revision,
// is_delta=False,
// writer_name=self.buffer.unique_name,
// new_items=new_pl,
// keys_to_remove=[])
// remote_server = self.buffer._get_remote_server(self)
// new_revision = remote_server.updatePayload(requested_update)
// if new_revision == 0:
// raise IUUpdateFailedError(self)
// else:
// self._revision = new_revision
// self._payload = RemotePushPayload(remote_push_iu=self, new_payload=new_pl)
// payload = property(
// fget=_get_payload,
// fset=_set_payload,
// doc='Payload dictionary of the IU.')
@Override
public void setPayload(List<PayloadItem> newItems, String writerName)
{
......@@ -215,23 +292,39 @@ public class RemotePushIU extends AbstractIU
{
throw new IUCommittedException(this);
}
if (isRetracted())
{
throw new IURetractedException(this);
}
if (isReadOnly())
{
throw new IUReadOnlyException(this);
}
IUPayloadUpdate iuu = IUPayloadUpdate.newBuilder().setRevision(getRevision()).setIsDelta(false).setUid(getUid())
IUPayloadUpdate iuu = IUPayloadUpdate.newBuilder().setRevision((int) getRevision()).setIsDelta(false).setUid(getUid())
.addAllNewItems(newItems).setWriterName(getBuffer() != null ? getBuffer().getUniqueName() : "").build();
RemoteServer server = inputBuffer.getRemoteServer(this);
int newRevision;
long newRevision;
try
{
newRevision = (Integer) server.call("updatePayload", iuu);
newRevision = (Long) server.call("updatePayload", iuu);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
catch (ExecutionException e)
{
throw new RuntimeException(e);
}
catch (TimeoutException e)
{
throw new RuntimeException(e);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUUpdateFailedException(this);
......@@ -243,37 +336,33 @@ public class RemotePushIU extends AbstractIU
}
}
// def _apply_update(self, update):
// """Apply a IUPayloadUpdate to the IU."""
// self._revision = update.revision
// if update.is_delta:
// for k in update.keys_to_remove: self.payload._remotely_enforced_delitem(k)
// for k, v in update.new_items.items(): self.payload._remotely_enforced_setitem(k, v)
// else:
// # using '_payload' to circumvent the local writing methods
// self._payload = RemotePushPayload(remote_push_iu=self, new_payload=update.new_items)
/**
* Apply a IUPayloadUpdate to the IU.
* @param update
*/
public void applyUpdate(IUPayloadUpdate update)
{
revision = update.getRevision();
if (update.getIsDelta())
{
for (String key : update.getKeysToRemoveList())
{
payload.enforcedRemoveItem(key);
}
for (PayloadItem item : update.getNewItemsList())
{
payload.enforcedSetItem(item.getKey(), item.getValue());
}
}
else
{
payload = new Payload(this, update.getNewItemsList());
}
public void applyUpdate(IUPayloadUpdate update) {
revision = update.getRevision();
if (update.getIsDelta()) {
for (String key : update.getKeysToRemoveList()) {
payload.enforcedRemoveItem(key);
}
for (PayloadItem item : update.getNewItemsList()) {
if (item.getType().equals("STR")) {
payload.enforcedSetItem(item.getKey(), item.getValue());
} else if (item.getType().equals("JSON")) {
String value = item.getValue();
if (value.startsWith("\"")) {
payload.enforcedSetItem(item.getKey(), value.replaceAll("\\\"", ""));
} else if (value.startsWith("{") || value.startsWith("[") || value.matches("true") || value.matches("false") || value.matches("-?[0-9]*[.,]?[0-9][0-9]*.*")) {
payload.enforcedSetItem(item.getKey(), value);
} else if (value.equals("null")) {
payload.enforcedSetItem(item.getKey(), "");
}
}
}
} else {
payload = new Payload(this, update.getNewItemsList());
}
}
public void applyLinkUpdate(IULinkUpdate update)
......@@ -309,14 +398,16 @@ public class RemotePushIU extends AbstractIU
}
// def _apply_commission(self):
// """Apply commission to the IU"""
// self._committed = True
public void applyCommmision()
{
committed = true;
}
public void applyRetraction()
{
retracted = true;
}
@Override
void removeFromPayload(Object key, String writer)
{
......@@ -324,22 +415,38 @@ public class RemotePushIU extends AbstractIU
{
throw new IUCommittedException(this);
}
if (isRetracted())
{
throw new IURetractedException(this);
}
if (isReadOnly())
{
throw new IUReadOnlyException(this);
}
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision(getRevision())
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision((int) getRevision())
.setWriterName(getBuffer().getUniqueName()).addKeysToRemove((String) key).build();
RemoteServer server = getInputBuffer().getRemoteServer(this);
int newRevision;
long newRevision;
try
{
newRevision = (Integer) server.call("updatePayload", update);
newRevision = (Long) server.call("updatePayload", update);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
catch (ExecutionException e)
{
throw new RuntimeException(e);
}
catch (TimeoutException e)
{
throw new RuntimeException(e);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUUpdateFailedException(this);
......@@ -347,32 +454,18 @@ public class RemotePushIU extends AbstractIU
setRevision(newRevision);
}
// def _modify_payload(self, payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
// """Modify the payload: add or remove item from this payload remotely and send update."""
// if self.committed:
// raise IUCommittedError(self)
// if self.read_only:
// raise IUReadOnlyError(self)
// requested_update = IUPayloadUpdate(
// uid=self.uid,
// revision=self.revision,
// is_delta=is_delta,
// writer_name=self.buffer.unique_name,
// new_items=new_items,
// keys_to_remove=keys_to_remove)
// remote_server = self.buffer._get_remote_server(self)
// new_revision = remote_server.updatePayload(requested_update)
// if new_revision == 0:
// raise IUUpdateFailedError(self)
// else:
// self._revision = new_revision
@Override
public void modifyLinks(boolean isDelta, SetMultimap<String, String> linksToAdd, SetMultimap<String, String> linksToRemove, String writerName)
void modifyLinks(boolean isDelta, SetMultimap<String, String> linksToAdd, SetMultimap<String, String> linksToRemove, String writerName)
{
if (isCommitted())
{
throw new IUCommittedException(this);
}
if (isRetracted())
{
throw new IURetractedException(this);
}
if (isReadOnly())
{
throw new IUReadOnlyException(this);
......@@ -390,16 +483,28 @@ public class RemotePushIU extends AbstractIU
}
IULinkUpdate update = IULinkUpdate.newBuilder().addAllLinksToRemove(removeLinkSet).addAllNewLinks(newLinkSet).setIsDelta(isDelta)
.setWriterName(getBuffer() != null ? getBuffer().getUniqueName() : "").setUid(getUid()).setRevision(getRevision()).build();
int newRevision;
.setWriterName(getBuffer() != null ? getBuffer().getUniqueName() : "").setUid(getUid()).setRevision((int) getRevision()).build();
long newRevision;
try
{
newRevision = (Integer) server.call("updateLinks", update);
newRevision = (Long) server.call("updateLinks", update);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
catch (ExecutionException e)
{
throw new RuntimeException(e);
}
catch (TimeoutException e)
{
throw new RuntimeException(e);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUUpdateFailedException(this);
......
package ipaaca.util;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
import ipaaca.LocalIU;
import ipaaca.OutputBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
/**
* A simple key-value blackboard
* @author hvanwelbergen
*/
public class Blackboard
{
private final OutputBuffer outBuffer;
private final InputBuffer inBuffer;
private final LocalIU iu;
private final ComponentNotifier notifier;
private static final String DUMMY_KEY = "DUMMY_KEY";
public static final String MESSAGE_SUFFIX = "MESSAGE";
private int dummyValue = 0;
private List<BlackboardUpdateListener> listeners = Collections.synchronizedList(new ArrayList<BlackboardUpdateListener>());
public Blackboard(String id, String category)
{
this(id, category, "default");
}
private void updateListeners()
{
synchronized (listeners)
{
for (BlackboardUpdateListener listener : listeners)
{
listener.update();
}
}
}
public Blackboard(String id, String category, String channel)
{
outBuffer = new OutputBuffer(id, channel);
iu = new LocalIU(category);
outBuffer.add(iu);
outBuffer.registerHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
updateListeners();
}
});
inBuffer = new InputBuffer(id, ImmutableSet.of(ComponentNotifier.NOTIFY_CATEGORY, category + MESSAGE_SUFFIX), channel);
notifier = new ComponentNotifier(id, category, ImmutableSet.of(category), new HashSet<String>(), outBuffer, inBuffer);
notifier.addNotificationHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iuNotify, IUEventType type, boolean local)
{
dummyValue++;
iu.getPayload().put(DUMMY_KEY, "" + dummyValue);
}
});
notifier.initialize();
inBuffer.registerHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iuMessage, IUEventType type, boolean local)
{
iu.getPayload().putAll(iuMessage.getPayload());
updateListeners();
}
}, ImmutableSet.of(category + MESSAGE_SUFFIX));
}
public String put(String key, String value)
{
return iu.getPayload().put(key, value);
}
public void putAll(Map<String, String> newItems)
{
iu.getPayload().putAll(newItems);
}
/**
* Get the value corresponding to the key, or null if it is not available
*/
public String get(String key)
{
return iu.getPayload().get(key);
}
public void addUpdateListener(BlackboardUpdateListener listener)
{
listeners.add(listener);
}
public Set<String> keySet()
{
return iu.getPayload().keySet();
}
public Set<Map.Entry<String, String>> entrySet()
{
return iu.getPayload().entrySet();
}
public Collection<String> values()
{
return iu.getPayload().values();
}
public void close()
{
outBuffer.close();
inBuffer.close();
}
}
package ipaaca.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
/**
* Client to get/set key value pairs on a Blackboard
* @author hvanwelbergen
*
*/
public class BlackboardClient
{
private final InputBuffer inBuffer;
private final OutputBuffer outBuffer;
private List<BlackboardUpdateListener> listeners = Collections.synchronizedList(new ArrayList<BlackboardUpdateListener>());
private final String category;
public BlackboardClient(String id, String category)
{
this(id, category, "default");
}
public BlackboardClient(String id, String category, String channel)
{
this.category = category;
inBuffer = new InputBuffer(id, ImmutableSet.of(category, ComponentNotifier.NOTIFY_CATEGORY), channel);
inBuffer.setResendActive(true);
inBuffer.registerHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
synchronized (listeners)
{
for (BlackboardUpdateListener listener : listeners)
{
listener.update();
}
}
}
}, ImmutableSet.of(category));
outBuffer = new OutputBuffer(id);
ComponentNotifier notifier = new ComponentNotifier(id, category, new HashSet<String>(), ImmutableSet.of(category),
outBuffer, inBuffer);
notifier.initialize();
}
public void close()
{
inBuffer.close();
outBuffer.close();
}
public void waitForBlackboardConnection()
{
while(inBuffer.getIUs().isEmpty());
}
public String get(String key)
{
if (inBuffer.getIUs().isEmpty())
{
return null;
}
return inBuffer.getIUs().iterator().next().getPayload().get(key);
}
public void put(String key, String value)
{
LocalMessageIU iu = new LocalMessageIU(category+Blackboard.MESSAGE_SUFFIX);
iu.getPayload().put(key,value);
outBuffer.add(iu);
}
public void putAll(Map<String,String> values)
{
LocalMessageIU iu = new LocalMessageIU(category+Blackboard.MESSAGE_SUFFIX);
iu.getPayload().putAll(values);
outBuffer.add(iu);
}
private boolean hasIU()
{
return !inBuffer.getIUs().isEmpty();
}
private AbstractIU getIU()
{
return inBuffer.getIUs().iterator().next();
}
public Set<String> keySet()
{
if(!hasIU())return new HashSet<>();
return getIU().getPayload().keySet();
}
public Set<Map.Entry<String, String>> entrySet()
{
if(!hasIU())return new HashSet<>();
return getIU().getPayload().entrySet();
}
public Collection<String> values()
{
if(!hasIU())return new HashSet<>();
return getIU().getPayload().values();
}
public void addUpdateListener(BlackboardUpdateListener listener)
{
listeners.add(listener);
}
}
package ipaaca.util;
public interface BlackboardUpdateListener
{
void update();
}
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2013 Sociable Agents Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca.util;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventHandler;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
/**
* Utility class to handle component notification: a componentNotify is sent at initialization and whenever new components sent
* their componentNotify.
* @author hvanwelbergen
*
*/
public class ComponentNotifier
{
public static final String NOTIFY_CATEGORY = "componentNotify";
public static final String SEND_CATEGORIES = "send_categories";
public static final String RECEIVE_CATEGORIES = "recv_categories";
public static final String STATE = "state";
public static final String NAME = "name";
public static final String FUNCTION = "function";
private final OutputBuffer outBuffer;
private final String componentName;
private final String componentFunction;
private final ImmutableSet<String> sendCategories;
private final ImmutableSet<String> receiveCategories;
private final InputBuffer inBuffer;
private List<HandlerFunctor> handlers = Collections.synchronizedList(new ArrayList<HandlerFunctor>());
private volatile boolean isInitialized = false;
private final BlockingQueue<String> receiverQueue = new LinkedBlockingQueue<String>();
private class ComponentNotifyHandler implements HandlerFunctor
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
if(iu.getPayload().get(NAME).equals(componentName))return; //don't re-notify self
String receivers[] = iu.getPayload().get(RECEIVE_CATEGORIES).split("\\s*,\\s*");
receiverQueue.addAll(ImmutableSet.copyOf(receivers));
synchronized(handlers)
{
for (HandlerFunctor h : handlers)
{
h.handle(iu, type, local);
}
}
if (iu.getPayload().get(STATE).equals("new"))
{
submitNotify(false);
}
}
}
/**
* Register a handler that will be called whenever a new component notifies this ComponentNotifier
*/
public void addNotificationHandler(HandlerFunctor h)
{
handlers.add(h);
}
/**
* Wait until the receivers are registered for categories
*/
public void waitForReceivers(ImmutableSet<String> categories)
{
Set<String> unhandledCategories = new HashSet<String>(categories);
while(!unhandledCategories.isEmpty())
{
try
{
unhandledCategories.remove(receiverQueue.take());
}
catch (InterruptedException e)
{
Thread.interrupted();
}
}
}
/**
* Wait until receivers are registered for all categories sent by the component
*/
public void waitForReceivers()
{
waitForReceivers(sendCategories);
}
private void submitNotify(boolean isNew)
{
LocalMessageIU notifyIU = new LocalMessageIU();
notifyIU.setCategory(NOTIFY_CATEGORY);
notifyIU.getPayload().put(NAME, componentName);
notifyIU.getPayload().put("function", componentFunction);
notifyIU.getPayload().put(SEND_CATEGORIES, Joiner.on(",").join(sendCategories));
notifyIU.getPayload().put(RECEIVE_CATEGORIES, Joiner.on(",").join(receiveCategories));
notifyIU.getPayload().put(STATE, isNew ? "new" : "old");
outBuffer.add(notifyIU);
}
public ComponentNotifier(String componentName, String componentFunction, Set<String> sendCategories, Set<String> receiveCategories,
OutputBuffer outBuffer, InputBuffer inBuffer)
{
this.componentName = componentName;
this.componentFunction = componentFunction;
this.sendCategories = ImmutableSet.copyOf(sendCategories);
this.receiveCategories = ImmutableSet.copyOf(receiveCategories);
this.outBuffer = outBuffer;
this.inBuffer = inBuffer;
}
public synchronized void initialize()
{
if(!isInitialized)
{
inBuffer.registerHandler(new IUEventHandler(new ComponentNotifyHandler(), EnumSet.of(IUEventType.ADDED, IUEventType.MESSAGE), ImmutableSet
.of(NOTIFY_CATEGORY)));
submitNotify(true);
isInitialized = true;
}
}
}
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2016 Social Cognitive Systems Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaaca.util;
import ipaaca.LocalMessageIU;
import ipaaca.OutputBuffer;
import java.util.HashMap;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
public class IpaacaLogger {
private static OutputBuffer ob;
private static final Object lock = new Object();
private static boolean SEND_IPAACA_LOGS = true;
private static String MODULE_NAME = "???";
private static void initializeOutBuffer() {
synchronized (lock) {
if (ob == null) {
ob = new OutputBuffer("LogSender");
}
}
}
public static void setModuleName(String name) {
synchronized (lock) {
MODULE_NAME = name;
}
}
public static void setLogFileName(String fileName, String logMode) {
initializeOutBuffer();
LocalMessageIU msg = new LocalMessageIU("log");
HashMap<String, String> pl = new HashMap<String, String>();
pl.put("cmd", "open_log_file");
pl.put("filename", fileName);
if (logMode != null) {
if (logMode.equals("append") ||
logMode.equals("overwrite") ||
logMode.equals("timestamp")) {
pl.put("existing", logMode);
} else {
return;
}
}
ob.add(msg);
}
public static void sendIpaacaLogs(boolean flag) {
synchronized (lock) {
SEND_IPAACA_LOGS = flag;
}
}
private static void logConsole(String level, String text, float now, String function, String thread) {
for(String line: text.split("\n")) {
System.out.println("[" + level + "] " + thread + " " + function + " " + line);
function = StringUtils.leftPad("", function.length(), ' ');
thread = StringUtils.leftPad("", thread.length(), ' ');
}
}
private static void logIpaaca(String level, String text, float now, String function, String thread) {
initializeOutBuffer();
LocalMessageIU msg = new LocalMessageIU("log");
HashMap<String, String> pl = new HashMap<String, String>();
pl.put("module", MODULE_NAME);
pl.put("function", function);
pl.put("level", level);
pl.put("time", String.format("%.3f", now));
pl.put("thread", thread);
pl.put("uuid", UUID.randomUUID().toString());
pl.put("text", text);
msg.setPayload(pl);
ob.add(msg);
}
private static String getCallerName() {
String function = Thread.currentThread().getStackTrace()[3].getClassName();
function += "." + Thread.currentThread().getStackTrace()[3].getMethodName();
return function;
}
public static void logError(String msg) {
logError(msg, System.currentTimeMillis(), getCallerName());
}
public static void logError(String msg, float now) {
logError(msg, now, getCallerName());
}
private static void logError(String msg, float now, String callerName) {
String thread = Thread.currentThread().getName();
if (SEND_IPAACA_LOGS) {
logIpaaca("ERROR", msg, now, callerName, thread);
}
logConsole("ERROR", msg, now, callerName, thread);
}
public static void logWarn(String msg) {
logWarn(msg, System.currentTimeMillis(), getCallerName());
}
public static void logWarn(String msg, float now) {
logWarn(msg, now, getCallerName());
}
private static void logWarn(String msg, float now, String callerName) {
String thread = Thread.currentThread().getName();
if (SEND_IPAACA_LOGS) {
logIpaaca("WARN", msg, now, callerName, thread);
}
logConsole("WARN", msg, now, callerName, thread);
}
public static void logInfo(String msg) {
logInfo(msg, System.currentTimeMillis(), getCallerName());
}
public static void logInfo(String msg, float now) {
logInfo(msg, now, getCallerName());
}
private static void logInfo(String msg, float now, String callerName) {
String thread = Thread.currentThread().getName();
if (SEND_IPAACA_LOGS) {
logIpaaca("INFO", msg, now, callerName, thread);
}
logConsole("INFO", msg, now, callerName, thread);
}
public static void logDebug(String msg) {
logDebug(msg, System.currentTimeMillis(), getCallerName());
}
public static void logDebug(String msg, float now) {
logDebug(msg, now, getCallerName());
}
private static void logDebug(String msg, float now, String callerName) {
String thread = Thread.currentThread().getName();
if (SEND_IPAACA_LOGS) {
logIpaaca("DEBUG", msg, now, callerName, thread);
}
logConsole("DEBUG", msg, now, callerName, thread);
}
}
package ipaaca.util.communication;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableSet;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventHandler;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
/**
* Obtain a IU in the future. Usage:<br>
* FutureIU fu = FutureIU("componentx", "status", "started"); //wait for componentx to send a message that is it fully started<br>
* [Start componentx, assumes that component x will send a message or other iu with status=started in the payload]<br>
* AbstractIU iu = fu.take(); //get the actual IU
* @author hvanwelbergen
*/
public class FutureIU
{
private final InputBuffer inBuffer;
private final BlockingQueue<AbstractIU> queue = new ArrayBlockingQueue<AbstractIU>(1);
private final IUEventHandler handler;
public FutureIU(String category, final String idKey, final String idVal, InputBuffer inBuffer)
{
this.inBuffer = inBuffer;
handler = new IUEventHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
String id = iu.getPayload().get(idKey);
if (idVal.equals(id))
{
queue.offer(iu);
}
}
}, ImmutableSet.of(category));
inBuffer.registerHandler(handler);
}
public FutureIU(String category, String idKey, String idVal)
{
this(category, idKey, idVal, new InputBuffer("FutureIU", ImmutableSet.of(category)));
}
/**
* Closes the FutureIU, use only if get is not used.
*/
public void cleanup()
{
inBuffer.removeHandler(handler);
if (inBuffer.getOwningComponentName().equals("FutureIU"))
{
inBuffer.close();
}
}
/**
* Waits (if necessary) for the IU and take it (can be done only once)
*/
public AbstractIU take() throws InterruptedException
{
AbstractIU iu;
try
{
iu = queue.take();
}
finally
{
cleanup();
}
return iu;
}
/**
* Wait for at most the given time for the IU and take it (can be done only once), return null on timeout
*/
public AbstractIU take(long timeout, TimeUnit unit) throws InterruptedException
{
AbstractIU iu;
try
{
iu = queue.poll(timeout, unit);
}
finally
{
cleanup();
}
return iu;
}
}
package ipaaca.util.communication;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableSet;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventType;
import ipaaca.InputBuffer;
/**
* Obtain multiple future ius on an specific category. Usage:<br>
* FutureIUs futures = new FutureIUs(componentx, key);<br>
* [make componentx send a IU with key=keyvaluedesired1]<br>
* AbstractIU iu = futures.take(keyvaluedesired1);<br>
* [make componentx send a IU with key=keyvaluedesired2]
* AbstractIU iu = futures.take(keyvaluedesired2);<br>
* ...<br>
* futures.close();
* @author hvanwelbergen
*/
public class FutureIUs
{
private final InputBuffer inBuffer;
private final Map<String,BlockingQueue<AbstractIU>> resultsMap = Collections.synchronizedMap(new HashMap<String,BlockingQueue<AbstractIU>>());
public FutureIUs(String category, final String idKey)
{
inBuffer = new InputBuffer("FutureIUs", ImmutableSet.of(category));
inBuffer.registerHandler(new HandlerFunctor()
{
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
String id = iu.getPayload().get(idKey);
resultsMap.putIfAbsent(id, new ArrayBlockingQueue<AbstractIU>(1));
resultsMap.get(id).offer(iu);
}
}, ImmutableSet.of(category));
}
/**
* Waits (if necessary) for the IU and take it (can be done only once)
*/
public AbstractIU take(String idValue) throws InterruptedException
{
resultsMap.putIfAbsent(idValue, new ArrayBlockingQueue<AbstractIU>(1));
return resultsMap.get(idValue).take();
}
/**
* Wait for at most the given time for the IU and take it (can be done only once), return null on timeout
*/
public AbstractIU take(String idValue, long timeout, TimeUnit unit) throws InterruptedException
{
resultsMap.putIfAbsent(idValue, new ArrayBlockingQueue<AbstractIU>(1));
return resultsMap.get(idValue).poll(timeout, unit);
}
public void close()
{
inBuffer.close();
}
}
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2013 Sociable Agents Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
package ipaacademo;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
/**
* Demonstrates how to call a python script from java
* @author hvanwelbergen
*
*/
public class PythonCall
{
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException
{
String program = "print 'Hello world'";
Process p = Runtime.getRuntime().exec(new String[]{"python","-c", program});
InputStream in = p.getInputStream();
BufferedInputStream buf = new BufferedInputStream(in);
InputStreamReader inread = new InputStreamReader(buf);
BufferedReader bufferedreader = new BufferedReader(inread);
// Read the ls output
String line;
while ((line = bufferedreader.readLine()) != null)
{
System.out.println(line);
}
try
{
if (p.waitFor() != 0)
{
System.err.println("exit value = " + p.exitValue());
}
}
catch (InterruptedException e)
{
System.err.println(e);
}
in = p.getErrorStream();
buf = new BufferedInputStream(in);
inread = new InputStreamReader(buf);
bufferedreader = new BufferedReader(inread);
// Read the ls output
while ((line = bufferedreader.readLine()) != null)
{
System.out.println(line);
}
}
}