Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • scs/ipaaca
  • ramin.yaghoubzadeh/ipaaca
2 results
Show changes
Showing
with 1543 additions and 225 deletions
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
package ipaaca; package ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.PayloadItem; import ipaaca.protobuf.Ipaaca.PayloadItem;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -59,9 +60,11 @@ public abstract class AbstractIU ...@@ -59,9 +60,11 @@ public abstract class AbstractIU
protected boolean committed = false; protected boolean committed = false;
protected boolean retracted = false; protected boolean retracted = false;
private String uid; private String uid;
protected int revision; protected long revision;
private boolean readOnly = false; private boolean readOnly = false;
public abstract IU.AccessMode getAccessMode();
protected SetMultimap<String, String> links = HashMultimap.create(); protected SetMultimap<String, String> links = HashMultimap.create();
private final SetMultimap<String, String> EMPTYLINKS = HashMultimap.create(); private final SetMultimap<String, String> EMPTYLINKS = HashMultimap.create();
...@@ -147,7 +150,7 @@ public abstract class AbstractIU ...@@ -147,7 +150,7 @@ public abstract class AbstractIU
this.readOnly = readOnly; this.readOnly = readOnly;
} }
public void setRevision(int revision) public void setRevision(long revision)
{ {
this.revision = revision; this.revision = revision;
} }
...@@ -187,7 +190,7 @@ public abstract class AbstractIU ...@@ -187,7 +190,7 @@ public abstract class AbstractIU
return buffer; return buffer;
} }
public int getRevision() public long getRevision()
{ {
return revision; return revision;
} }
......
...@@ -88,12 +88,8 @@ public class IUConverter implements Converter<ByteBuffer> ...@@ -88,12 +88,8 @@ public class IUConverter implements Converter<ByteBuffer>
links.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build()); links.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build());
} }
IU.AccessMode accessMode = IU.AccessMode.PUSH; IU.AccessMode accessMode = iua.getAccessMode();
if(iua instanceof RemoteMessageIU || iua instanceof LocalMessageIU) IU iu = IU.newBuilder().setUid(iua.getUid()).setRevision((int) iua.getRevision()).setCategory(iua.getCategory())
{
accessMode = IU.AccessMode.MESSAGE;
}
IU iu = IU.newBuilder().setUid(iua.getUid()).setRevision(iua.getRevision()).setCategory(iua.getCategory())
.setOwnerName(iua.getOwnerName()).setCommitted(iua.isCommitted()).setAccessMode(accessMode) .setOwnerName(iua.getOwnerName()).setCommitted(iua.isCommitted()).setAccessMode(accessMode)
.setReadOnly(iua.isReadOnly()).setPayloadType("STR").addAllPayload(payloadItems).addAllLinks(links).build(); .setReadOnly(iua.isReadOnly()).setPayloadType("STR").addAllPayload(payloadItems).addAllLinks(links).build();
String wireFormat = (accessMode == IU.AccessMode.MESSAGE) ? "ipaaca-messageiu" : "ipaaca-iu"; String wireFormat = (accessMode == IU.AccessMode.MESSAGE) ? "ipaaca-messageiu" : "ipaaca-iu";
......
...@@ -483,10 +483,10 @@ public class InputBuffer extends Buffer ...@@ -483,10 +483,10 @@ public class InputBuffer extends Buffer
rServer = getRemoteServer(writerName); rServer = getRemoteServer(writerName);
if ((rServer != null)&&(uid != null)) { if ((rServer != null)&&(uid != null)) {
IUResendRequest iurr = IUResendRequest.newBuilder().setUid(uid).setHiddenScopeName(hiddenScopeName).build(); IUResendRequest iurr = IUResendRequest.newBuilder().setUid(uid).setHiddenScopeName(hiddenScopeName).build();
int rRevision = 0; long rRevision = 0;
try try
{ {
rRevision = (Integer) rServer.call("resendRequest", iurr); rRevision = (Long) rServer.call("resendRequest", iurr);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -500,6 +500,10 @@ public class InputBuffer extends Buffer ...@@ -500,6 +500,10 @@ public class InputBuffer extends Buffer
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (rRevision == 0) if (rRevision == 0)
{ {
//throw new IUResendFailedException(aiu); // TODO //throw new IUResendFailedException(aiu); // TODO
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
package ipaaca; package ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.IULinkUpdate; import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate; import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate.Builder; import ipaaca.protobuf.Ipaaca.IUPayloadUpdate.Builder;
...@@ -50,6 +51,11 @@ import com.google.common.collect.SetMultimap; ...@@ -50,6 +51,11 @@ import com.google.common.collect.SetMultimap;
public class LocalIU extends AbstractIU public class LocalIU extends AbstractIU
{ {
public IU.AccessMode getAccessMode()
{
return IU.AccessMode.PUSH;
}
private OutputBuffer outputBuffer; private OutputBuffer outputBuffer;
...@@ -205,7 +211,7 @@ public class LocalIU extends AbstractIU ...@@ -205,7 +211,7 @@ public class LocalIU extends AbstractIU
removeSet.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build()); removeSet.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build());
} }
outputBuffer.sendIULinkUpdate(this, outputBuffer.sendIULinkUpdate(this,
IULinkUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setWriterName(wName).setIsDelta(isDelta) IULinkUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setWriterName(wName).setIsDelta(isDelta)
.addAllNewLinks(addSet).addAllLinksToRemove(removeSet).build()); .addAllNewLinks(addSet).addAllLinksToRemove(removeSet).build());
} }
} }
...@@ -260,7 +266,7 @@ public class LocalIU extends AbstractIU ...@@ -260,7 +266,7 @@ public class LocalIU extends AbstractIU
// send update to remote holders // send update to remote holders
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR") PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR")
.build(); .build();
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true) IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(true)
.setWriterName(writer == null ? getOwnerName() : writer).addNewItems(newItem).build(); .setWriterName(writer == null ? getOwnerName() : writer).addNewItems(newItem).build();
getOutputBuffer().sendIUPayloadUpdate(this, update); getOutputBuffer().sendIUPayloadUpdate(this, update);
} }
...@@ -284,7 +290,7 @@ public class LocalIU extends AbstractIU ...@@ -284,7 +290,7 @@ public class LocalIU extends AbstractIU
increaseRevisionNumber(); increaseRevisionNumber();
if (isPublished()) if (isPublished())
{ {
Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true) Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(true)
.setWriterName(writer == null ? getOwnerName() : writer); .setWriterName(writer == null ? getOwnerName() : writer);
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet()) for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{ {
...@@ -342,7 +348,7 @@ public class LocalIU extends AbstractIU ...@@ -342,7 +348,7 @@ public class LocalIU extends AbstractIU
if (isPublished()) if (isPublished())
{ {
// send update to remote holders // send update to remote holders
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true) IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(true)
.setWriterName(writer == null ? getOwnerName() : writer).addKeysToRemove((String) key).build(); .setWriterName(writer == null ? getOwnerName() : writer).addKeysToRemove((String) key).build();
getOutputBuffer().sendIUPayloadUpdate(this, update); getOutputBuffer().sendIUPayloadUpdate(this, update);
} }
...@@ -355,7 +361,7 @@ public class LocalIU extends AbstractIU ...@@ -355,7 +361,7 @@ public class LocalIU extends AbstractIU
{ {
if (isPublished()) if (isPublished())
{ {
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(false) IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(false)
.setWriterName(writerName == null ? getOwnerName() : writerName).addAllNewItems(newPayload).build(); .setWriterName(writerName == null ? getOwnerName() : writerName).addAllNewItems(newPayload).build();
getOutputBuffer().sendIUPayloadUpdate(this, update); getOutputBuffer().sendIUPayloadUpdate(this, update);
} }
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
package ipaaca; package ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
/** /**
* Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls. * Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls.
...@@ -50,4 +51,9 @@ public class LocalMessageIU extends LocalIU ...@@ -50,4 +51,9 @@ public class LocalMessageIU extends LocalIU
super(category); super(category);
} }
public IU.AccessMode getAccessMode()
{
return IU.AccessMode.MESSAGE;
}
} }
...@@ -55,7 +55,9 @@ import rsb.Informer; ...@@ -55,7 +55,9 @@ import rsb.Informer;
import rsb.InitializeException; import rsb.InitializeException;
import rsb.RSBException; import rsb.RSBException;
import rsb.patterns.DataCallback; import rsb.patterns.DataCallback;
import rsb.patterns.EventCallback;
import rsb.patterns.LocalServer; import rsb.patterns.LocalServer;
import rsb.Event;
import com.google.common.collect.HashMultimap; import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap; import com.google.common.collect.SetMultimap;
...@@ -131,48 +133,58 @@ public class OutputBuffer extends Buffer ...@@ -131,48 +133,58 @@ public class OutputBuffer extends Buffer
this.channel = ipaaca_channel; this.channel = ipaaca_channel;
} }
private final class RemoteUpdatePayload extends DataCallback<Integer, IUPayloadUpdate> private final class RemoteUpdatePayload extends EventCallback //DataCallback<Long, IUPayloadUpdate>
{ {
@Override @Override
public Integer invoke(IUPayloadUpdate data) throws Throwable public Event invoke(final Event request) //throws Throwable
{ {
logger.debug("remoteUpdate"); logger.debug("remoteUpdate");
return remoteUpdatePayload(data); long result = remoteUpdatePayload((IUPayloadUpdate) request.getData());
//System.out.println("remoteUpdatePayload yielded revision "+result);
return new Event(Long.class, new Long(result));
} }
} }
/*private final class RemoteUpdatePayload extends DataCallback<Long, IUPayloadUpdate>
{
@Override
public Long invoke(IUPayloadUpdate data) throws Throwable
{
logger.debug("remoteUpdate");
return remoteUpdatePayload(data);
}
private final class RemoteUpdateLinks extends DataCallback<Integer, IULinkUpdate> }*/
private final class RemoteUpdateLinks extends EventCallback // DataCallback<Long, IULinkUpdate>
{ {
@Override @Override
public Integer invoke(IULinkUpdate data) throws Throwable public Event invoke(final Event request) //throws Throwable
{ {
logger.debug("remoteUpdateLinks"); logger.debug("remoteUpdateLinks");
return remoteUpdateLinks(data); return new Event(Long.class, new Long(remoteUpdateLinks((IULinkUpdate) request.getData())));
} }
} }
private final class RemoteCommit extends DataCallback<Integer, IUCommission> private final class RemoteCommit extends EventCallback //DataCallback<Long, IUCommission>
{ {
@Override @Override
public Integer invoke(IUCommission data) throws Throwable public Event invoke(final Event request) //throws Throwable
{ {
logger.debug("remoteCommit"); logger.debug("remoteCommit");
return remoteCommit(data); return new Event(Long.class, new Long(remoteCommit((IUCommission) request.getData())));
} }
} }
private final class RemoteResendRequest extends DataCallback<Integer, IUResendRequest> private final class RemoteResendRequest extends EventCallback //DataCallback<Long, IUResendRequest>
{ {
@Override @Override
public Integer invoke(IUResendRequest data) throws Throwable public Event invoke(final Event request) //throws Throwable
{ {
logger.debug("remoteResendRequest"); logger.debug("remoteResendRequest");
return remoteResendRequest(data); return new Event(Long.class, new Long(remoteResendRequest((IUResendRequest) request.getData())));
} }
} }
// def _remote_update_payload(self, update): // def _remote_update_payload(self, update):
...@@ -199,7 +211,7 @@ public class OutputBuffer extends Buffer ...@@ -199,7 +211,7 @@ public class OutputBuffer extends Buffer
* Apply a remotely requested update to one of the stored IUs. * Apply a remotely requested update to one of the stored IUs.
* @return 0 if not updated, IU version number otherwise * @return 0 if not updated, IU version number otherwise
*/ */
int remoteUpdatePayload(IUPayloadUpdate update) long remoteUpdatePayload(IUPayloadUpdate update)
{ {
if (!iuStore.containsKey(update.getUid())) if (!iuStore.containsKey(update.getUid()))
{ {
...@@ -244,7 +256,7 @@ public class OutputBuffer extends Buffer ...@@ -244,7 +256,7 @@ public class OutputBuffer extends Buffer
* Apply a remotely requested update to one of the stored IUs. * Apply a remotely requested update to one of the stored IUs.
* @return 0 if not updated, IU version number otherwise * @return 0 if not updated, IU version number otherwise
*/ */
int remoteUpdateLinks(IULinkUpdate update) long remoteUpdateLinks(IULinkUpdate update)
{ {
if (!iuStore.containsKey(update.getUid())) if (!iuStore.containsKey(update.getUid()))
{ {
...@@ -307,7 +319,7 @@ public class OutputBuffer extends Buffer ...@@ -307,7 +319,7 @@ public class OutputBuffer extends Buffer
/** /**
* Apply a remotely requested commit to one of the stored IUs. * Apply a remotely requested commit to one of the stored IUs.
*/ */
private int remoteCommit(IUCommission iuc) private long remoteCommit(IUCommission iuc)
{ {
if (!iuStore.containsKey(iuc.getUid())) if (!iuStore.containsKey(iuc.getUid()))
{ {
...@@ -336,7 +348,7 @@ public class OutputBuffer extends Buffer ...@@ -336,7 +348,7 @@ public class OutputBuffer extends Buffer
/* /*
* Resend an requested iu over the specific hidden channel. (dlw) TODO * Resend an requested iu over the specific hidden channel. (dlw) TODO
*/ */
private int remoteResendRequest(IUResendRequest iu_resend_request_pack) private long remoteResendRequest(IUResendRequest iu_resend_request_pack)
{ {
if (!iuStore.containsKey(iu_resend_request_pack.getUid())) if (!iuStore.containsKey(iu_resend_request_pack.getUid()))
{ {
...@@ -349,7 +361,7 @@ public class OutputBuffer extends Buffer ...@@ -349,7 +361,7 @@ public class OutputBuffer extends Buffer
Informer<Object> informer = getInformer(iu_resend_request_pack.getHiddenScopeName()); Informer<Object> informer = getInformer(iu_resend_request_pack.getHiddenScopeName());
try try
{ {
informer.send(iu); informer.publish(iu);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -404,6 +416,10 @@ public class OutputBuffer extends Buffer ...@@ -404,6 +416,10 @@ public class OutputBuffer extends Buffer
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (RSBException e)
{
throw new RuntimeException(e);
}
return informer; return informer;
} }
...@@ -441,7 +457,7 @@ public class OutputBuffer extends Buffer ...@@ -441,7 +457,7 @@ public class OutputBuffer extends Buffer
Informer<Object> informer = getInformer(iu.getCategory()); Informer<Object> informer = getInformer(iu.getCategory());
try try
{ {
informer.send(iu); informer.publish(iu);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -475,12 +491,12 @@ public class OutputBuffer extends Buffer ...@@ -475,12 +491,12 @@ public class OutputBuffer extends Buffer
*/ */
protected void sendIUCommission(AbstractIU iu, String writerName) protected void sendIUCommission(AbstractIU iu, String writerName)
{ {
IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(iu.getUid()).setRevision(iu.getRevision()) IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(iu.getUid()).setRevision((int) iu.getRevision())
.setWriterName(writerName == null ? iu.getOwnerName() : writerName).build(); .setWriterName(writerName == null ? iu.getOwnerName() : writerName).build();
Informer<Object> informer = getInformer(iu.getCategory()); Informer<Object> informer = getInformer(iu.getCategory());
try try
{ {
informer.send(iuc); informer.publish(iuc);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -490,11 +506,11 @@ public class OutputBuffer extends Buffer ...@@ -490,11 +506,11 @@ public class OutputBuffer extends Buffer
protected void sendIURetraction(AbstractIU iu) protected void sendIURetraction(AbstractIU iu)
{ {
IURetraction iuc = Ipaaca.IURetraction.newBuilder().setUid(iu.getUid()).setRevision(iu.getRevision()).build(); IURetraction iuc = Ipaaca.IURetraction.newBuilder().setUid(iu.getUid()).setRevision((int) iu.getRevision()).build();
Informer<Object> informer = getInformer(iu.getCategory()); Informer<Object> informer = getInformer(iu.getCategory());
try try
{ {
informer.send(iuc); informer.publish(iuc);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -534,7 +550,7 @@ public class OutputBuffer extends Buffer ...@@ -534,7 +550,7 @@ public class OutputBuffer extends Buffer
Informer<Object> informer = getInformer(iu.getCategory()); Informer<Object> informer = getInformer(iu.getCategory());
try try
{ {
informer.send(update); informer.publish(update);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -547,7 +563,7 @@ public class OutputBuffer extends Buffer ...@@ -547,7 +563,7 @@ public class OutputBuffer extends Buffer
Informer<Object> informer = getInformer(iu.getCategory()); Informer<Object> informer = getInformer(iu.getCategory());
try try
{ {
informer.send(update); informer.publish(update);
} }
catch (RSBException e) catch (RSBException e)
{ {
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
package ipaaca; package ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.PayloadItem; import ipaaca.protobuf.Ipaaca.PayloadItem;
import java.util.List; import java.util.List;
...@@ -45,6 +46,11 @@ import com.google.common.collect.SetMultimap; ...@@ -45,6 +46,11 @@ import com.google.common.collect.SetMultimap;
public class RemoteMessageIU extends AbstractIU public class RemoteMessageIU extends AbstractIU
{ {
public IU.AccessMode getAccessMode()
{
return IU.AccessMode.MESSAGE;
}
public RemoteMessageIU(String uid) public RemoteMessageIU(String uid)
{ {
super(uid); super(uid);
......
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
package ipaaca; package ipaaca;
import ipaaca.protobuf.Ipaaca; import ipaaca.protobuf.Ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.IUCommission; import ipaaca.protobuf.Ipaaca.IUCommission;
import ipaaca.protobuf.Ipaaca.IULinkUpdate; import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate; import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
...@@ -66,6 +67,11 @@ public class RemotePushIU extends AbstractIU ...@@ -66,6 +67,11 @@ public class RemotePushIU extends AbstractIU
private final static Logger logger = LoggerFactory.getLogger(RemotePushIU.class.getName()); private final static Logger logger = LoggerFactory.getLogger(RemotePushIU.class.getName());
private InputBuffer inputBuffer; private InputBuffer inputBuffer;
public IU.AccessMode getAccessMode()
{
return IU.AccessMode.PUSH;
}
public InputBuffer getInputBuffer() public InputBuffer getInputBuffer()
{ {
return inputBuffer; return inputBuffer;
...@@ -104,15 +110,17 @@ public class RemotePushIU extends AbstractIU ...@@ -104,15 +110,17 @@ public class RemotePushIU extends AbstractIU
throw new IUReadOnlyException(this); throw new IUReadOnlyException(this);
} }
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR").build(); PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR").build();
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision(getRevision()) IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision((int) getRevision())
.setWriterName(getBuffer().getUniqueName()).addNewItems(newItem).build(); .setWriterName(getBuffer().getUniqueName()).addNewItems(newItem).build();
RemoteServer server = getInputBuffer().getRemoteServer(this); RemoteServer server = getInputBuffer().getRemoteServer(this);
logger.debug("Remote server has methods {}", server.getMethods()); logger.debug("Remote server has methods {}", server.getMethods());
int newRevision; long newRevision;
try try
{ {
newRevision = (Integer) server.call("updatePayload", update); //System.out.println("calling remote updatePayload ...");
newRevision = (Long) server.call("updatePayload", update);
//System.out.println(" ... done");
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -126,6 +134,10 @@ public class RemotePushIU extends AbstractIU ...@@ -126,6 +134,10 @@ public class RemotePushIU extends AbstractIU
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0) if (newRevision == 0)
{ {
throw new IUUpdateFailedException(this); throw new IUUpdateFailedException(this);
...@@ -148,7 +160,7 @@ public class RemotePushIU extends AbstractIU ...@@ -148,7 +160,7 @@ public class RemotePushIU extends AbstractIU
{ {
throw new IUReadOnlyException(this); throw new IUReadOnlyException(this);
} }
Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true) Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision((int) getRevision()).setIsDelta(true)
.setWriterName(getBuffer().getUniqueName()); .setWriterName(getBuffer().getUniqueName());
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet()) for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{ {
...@@ -161,10 +173,10 @@ public class RemotePushIU extends AbstractIU ...@@ -161,10 +173,10 @@ public class RemotePushIU extends AbstractIU
RemoteServer server = getInputBuffer().getRemoteServer(this); RemoteServer server = getInputBuffer().getRemoteServer(this);
logger.debug("Remote server has methods {}", server.getMethods()); logger.debug("Remote server has methods {}", server.getMethods());
int newRevision; long newRevision;
try try
{ {
newRevision = (Integer) server.call("updatePayload", update); newRevision = (Long) server.call("updatePayload", update);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -178,6 +190,10 @@ public class RemotePushIU extends AbstractIU ...@@ -178,6 +190,10 @@ public class RemotePushIU extends AbstractIU
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0) if (newRevision == 0)
{ {
throw new IUUpdateFailedException(this); throw new IUUpdateFailedException(this);
...@@ -210,13 +226,13 @@ public class RemotePushIU extends AbstractIU ...@@ -210,13 +226,13 @@ public class RemotePushIU extends AbstractIU
} }
else else
{ {
IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(getUid()).setRevision(getRevision()) IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(getUid()).setRevision((int) getRevision())
.setWriterName(getBuffer().getUniqueName()).build(); .setWriterName(getBuffer().getUniqueName()).build();
RemoteServer server = inputBuffer.getRemoteServer(this); RemoteServer server = inputBuffer.getRemoteServer(this);
int newRevision; long newRevision;
try try
{ {
newRevision = (Integer) server.call("commit", iuc); newRevision = (Long) server.call("commit", iuc);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -230,6 +246,10 @@ public class RemotePushIU extends AbstractIU ...@@ -230,6 +246,10 @@ public class RemotePushIU extends AbstractIU
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0) if (newRevision == 0)
{ {
throw new IUCommittedException(this); throw new IUCommittedException(this);
...@@ -281,13 +301,13 @@ public class RemotePushIU extends AbstractIU ...@@ -281,13 +301,13 @@ public class RemotePushIU extends AbstractIU
throw new IUReadOnlyException(this); throw new IUReadOnlyException(this);
} }
IUPayloadUpdate iuu = IUPayloadUpdate.newBuilder().setRevision(getRevision()).setIsDelta(false).setUid(getUid()) IUPayloadUpdate iuu = IUPayloadUpdate.newBuilder().setRevision((int) getRevision()).setIsDelta(false).setUid(getUid())
.addAllNewItems(newItems).setWriterName(getBuffer() != null ? getBuffer().getUniqueName() : "").build(); .addAllNewItems(newItems).setWriterName(getBuffer() != null ? getBuffer().getUniqueName() : "").build();
RemoteServer server = inputBuffer.getRemoteServer(this); RemoteServer server = inputBuffer.getRemoteServer(this);
int newRevision; long newRevision;
try try
{ {
newRevision = (Integer) server.call("updatePayload", iuu); newRevision = (Long) server.call("updatePayload", iuu);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -301,6 +321,10 @@ public class RemotePushIU extends AbstractIU ...@@ -301,6 +321,10 @@ public class RemotePushIU extends AbstractIU
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0) if (newRevision == 0)
{ {
throw new IUUpdateFailedException(this); throw new IUUpdateFailedException(this);
...@@ -399,13 +423,13 @@ public class RemotePushIU extends AbstractIU ...@@ -399,13 +423,13 @@ public class RemotePushIU extends AbstractIU
{ {
throw new IUReadOnlyException(this); throw new IUReadOnlyException(this);
} }
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision(getRevision()) IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision((int) getRevision())
.setWriterName(getBuffer().getUniqueName()).addKeysToRemove((String) key).build(); .setWriterName(getBuffer().getUniqueName()).addKeysToRemove((String) key).build();
RemoteServer server = getInputBuffer().getRemoteServer(this); RemoteServer server = getInputBuffer().getRemoteServer(this);
int newRevision; long newRevision;
try try
{ {
newRevision = (Integer) server.call("updatePayload", update); newRevision = (Long) server.call("updatePayload", update);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -419,6 +443,10 @@ public class RemotePushIU extends AbstractIU ...@@ -419,6 +443,10 @@ public class RemotePushIU extends AbstractIU
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0) if (newRevision == 0)
{ {
throw new IUUpdateFailedException(this); throw new IUUpdateFailedException(this);
...@@ -455,11 +483,11 @@ public class RemotePushIU extends AbstractIU ...@@ -455,11 +483,11 @@ public class RemotePushIU extends AbstractIU
} }
IULinkUpdate update = IULinkUpdate.newBuilder().addAllLinksToRemove(removeLinkSet).addAllNewLinks(newLinkSet).setIsDelta(isDelta) IULinkUpdate update = IULinkUpdate.newBuilder().addAllLinksToRemove(removeLinkSet).addAllNewLinks(newLinkSet).setIsDelta(isDelta)
.setWriterName(getBuffer() != null ? getBuffer().getUniqueName() : "").setUid(getUid()).setRevision(getRevision()).build(); .setWriterName(getBuffer() != null ? getBuffer().getUniqueName() : "").setUid(getUid()).setRevision((int) getRevision()).build();
int newRevision; long newRevision;
try try
{ {
newRevision = (Integer) server.call("updateLinks", update); newRevision = (Long) server.call("updateLinks", update);
} }
catch (RSBException e) catch (RSBException e)
{ {
...@@ -473,6 +501,10 @@ public class RemotePushIU extends AbstractIU ...@@ -473,6 +501,10 @@ public class RemotePushIU extends AbstractIU
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0) if (newRevision == 0)
{ {
throw new IUUpdateFailedException(this); throw new IUUpdateFailedException(this);
......
...@@ -183,7 +183,7 @@ public class ComponentPushCommunicationIntegrationTest ...@@ -183,7 +183,7 @@ public class ComponentPushCommunicationIntegrationTest
payloadUpdate.put("chunk12", "item2"); payloadUpdate.put("chunk12", "item2");
payloadUpdate.put("chunk13", "item3"); payloadUpdate.put("chunk13", "item3");
payloadUpdate.put("chunk14", "item4"); payloadUpdate.put("chunk14", "item4");
int oldRev = iuIn.getRevision(); long oldRev = iuIn.getRevision();
localIU.getPayload().merge(payloadUpdate); localIU.getPayload().merge(payloadUpdate);
Thread.sleep(200); Thread.sleep(200);
assertEquals(oldRev + 1, iuIn.getRevision()); assertEquals(oldRev + 1, iuIn.getRevision());
...@@ -197,7 +197,7 @@ public class ComponentPushCommunicationIntegrationTest ...@@ -197,7 +197,7 @@ public class ComponentPushCommunicationIntegrationTest
payloadUpdate2.put("chunk22", "item6"); payloadUpdate2.put("chunk22", "item6");
payloadUpdate2.put("chunk13", "item3-changed"); payloadUpdate2.put("chunk13", "item3-changed");
payloadUpdate2.put("chunk14", "item4-changed"); payloadUpdate2.put("chunk14", "item4-changed");
int oldRev2 = iuIn.getRevision(); long oldRev2 = iuIn.getRevision();
iuIn.getPayload().merge(payloadUpdate2); iuIn.getPayload().merge(payloadUpdate2);
Thread.sleep(200); Thread.sleep(200);
assertEquals(oldRev2 + 1, localIU.getRevision()); assertEquals(oldRev2 + 1, localIU.getRevision());
......
...@@ -54,7 +54,7 @@ public class InputBufferTest ...@@ -54,7 +54,7 @@ public class InputBufferTest
iu.setOwnerName("owner"); iu.setOwnerName("owner");
iu.setReadOnly(false); iu.setReadOnly(false);
iu.setRevision(1); iu.setRevision(1);
informer.send(iu); informer.publish(iu);
Thread.sleep(1000); Thread.sleep(1000);
AbstractIU iuIn = inBuffer.getIU("uid1"); AbstractIU iuIn = inBuffer.getIU("uid1");
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
// "Incremental Processing Architecture // "Incremental Processing Architecture
// for Artificial Conversational Agents". // for Artificial Conversational Agents".
// //
// Copyright (c) 2009-2014 Social Cognitive Systems Group // Copyright (c) 2009-2022 Social Cognitive Systems Group
// CITEC, Bielefeld University // CITEC, Bielefeld University
// //
// http://opensource.cit-ec.de/projects/ipaaca/ // http://opensource.cit-ec.de/projects/ipaaca/
...@@ -28,72 +28,150 @@ ...@@ -28,72 +28,150 @@
// Forschungsgemeinschaft (DFG) in the context of the German // Forschungsgemeinschaft (DFG) in the context of the German
// Excellence Initiative. // Excellence Initiative.
syntax = "proto2";
package ipaaca.protobuf; package ipaaca.protobuf;
enum TransportMessageType {
WireTypeRESERVED = 0;
WireTypeIntMessage = 1;
WireTypeRemoteRequestResult = 2;
WireTypeIU = 3;
WireTypeMessageIU = 4; // special case on the wire (use other converter)
WireTypeIUPayloadUpdate = 5;
WireTypeIULinkUpdate = 6;
WireTypeIURetraction = 7;
WireTypeIUCommission = 8;
WireTypeIUResendRequest = 9;
WireTypeIUPayloadUpdateRequest = 100;
WireTypeIUCommissionRequest = 101;
WireTypeIULinkUpdateRequest = 102;
}
message TransportLevelWrapper {
required TransportMessageType transport_message_type = 1;
required bytes raw_message = 2;
}
message IntMessage { message IntMessage {
required sint32 value = 1; required sint32 value = 1;
} }
message LinkSet { message LinkSet {
required string type = 1; required string type = 1;
repeated string targets = 2; repeated string targets = 2;
} }
message PayloadItem { message PayloadItem {
required string key = 1; required string key = 1;
required string value = 2; required string value = 2;
required string type = 3 [default = "str"]; required string type = 3 [default = "str"];
} }
message IU { message IU {
enum AccessMode { enum AccessMode {
PUSH = 0; PUSH = 0;
REMOTE = 1; REMOTE = 1;
MESSAGE = 2; MESSAGE = 2;
} }
required string uid = 1; required string uid = 1;
required uint32 revision = 2; required uint32 revision = 2;
required string category = 3 [default = "undef"]; required string category = 3 [default = "undef"];
required string payload_type = 4 [default = "MAP"]; required string payload_type = 4 [default = "MAP"];
required string owner_name = 5; required string owner_name = 5;
required bool committed = 6 [default = false]; required bool committed = 6 [default = false];
required AccessMode access_mode = 7 [default = PUSH]; required AccessMode access_mode = 7 [default = PUSH];
required bool read_only = 8 [default = false]; required bool read_only = 8 [default = false];
repeated PayloadItem payload = 9; repeated PayloadItem payload = 9;
repeated LinkSet links = 10; repeated LinkSet links = 10;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
} }
message IUPayloadUpdate { message IUPayloadUpdate {
required string uid = 1; required string uid = 1;
required uint32 revision = 2; required uint32 revision = 2;
repeated PayloadItem new_items = 3; repeated PayloadItem new_items = 3;
repeated string keys_to_remove = 4; repeated string keys_to_remove = 4;
required bool is_delta = 5 [default = false]; required bool is_delta = 5 [default = false];
required string writer_name = 6; required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
} }
message IURetraction { message IURetraction {
required string uid = 1; required string uid = 1;
required uint32 revision = 2; required uint32 revision = 2;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
} }
message IUCommission { message IUCommission {
required string uid = 1; required string uid = 1;
required uint32 revision = 2; required uint32 revision = 2;
required string writer_name = 3; required string writer_name = 3;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IULinkUpdate {
required string uid = 1;
required uint32 revision = 2;
repeated LinkSet new_links = 3;
repeated LinkSet links_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
} }
message IUResendRequest { message IUResendRequest {
required string uid = 1; required string uid = 1;
required string hidden_scope_name = 2; required string hidden_scope_name = 2;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
} }
message IULinkUpdate { // Result for remote operations (below).
required string uid = 1; // Used to send a raw int, which was problematic.
required uint32 revision = 2; // Usually: 0 = Failed, >0 = new revision of successfully modified resource.
repeated LinkSet new_links = 3; message RemoteRequestResult {
repeated LinkSet links_to_remove = 4; required uint32 result = 1;
required bool is_delta = 5 [default = false]; optional string request_uid = 100 [default = ""];
required string writer_name = 6; //optional string request_endpoint = 101 [default = ""];
}
// Remote / request versions of buffer setters:
// they just go with a dedicated ID
message IUPayloadUpdateRequest {
required string uid = 1;
required uint32 revision = 2;
repeated PayloadItem new_items = 3;
repeated string keys_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
} }
message IUCommissionRequest {
required string uid = 1;
required uint32 revision = 2;
required string writer_name = 3;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IULinkUpdateRequest {
required string uid = 1;
required uint32 revision = 2;
repeated LinkSet new_links = 3;
repeated LinkSet links_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
...@@ -5,8 +5,6 @@ ...@@ -5,8 +5,6 @@
</publications> </publications>
<dependencies> <dependencies>
<dependency org="google" name="protobuf-python" rev="latest.release"/> <dependency org="google" name="protobuf-python" rev="latest.release"/>
<dependency org="rsb" name="rsb-python" rev="latest.release"/>
<dependency org="spread" name="spread" rev="latest.release"/>
</dependencies> </dependencies>
</ivy-module> </ivy-module>
......
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Fri Sep 9 14:12:05 2016
@author: jpoeppel
"""
from setuptools import setup
import os
import sys
import subprocess
from os import path as op
from distutils.spawn import find_executable
from setuptools.command.build_py import build_py
from setuptools.command.bdist_egg import bdist_egg
from distutils.command.build import build
from distutils.command.sdist import sdist
class ProtoBuild(build_py):
"""
This command automatically compiles all .proto files with `protoc` compiler
and places generated files near them -- i.e. in the same directory.
"""
def find_protoc(self):
"Locates protoc executable"
if 'PROTOC' in os.environ and os.path.exists(os.environ['PROTOC']):
protoc = os.environ['PROTOC']
else:
protoc = find_executable('protoc')
if protoc is None:
sys.stderr.write('protoc not found. Is protobuf-compiler installed? \n'
'Alternatively, you can point the PROTOC environment variable at a local version.')
sys.exit(1)
return protoc
def run(self):
#TODO determine path automaticall
packagedir = "../proto"
print("running build proto")
for protofile in filter(lambda x: x.endswith('.proto'), os.listdir(packagedir)):
source = op.join(packagedir, protofile)
output = source.replace('.proto', '_pb2.py')
if (not op.exists(output) or (op.getmtime(source) > op.getmtime(output))):
sys.stderr.write('Protobuf-compiling ' + source + '\n')
subprocess.check_call([self.find_protoc(), "-I={}".format(packagedir),'--python_out=./src/ipaaca', source])
class BDist_egg(bdist_egg):
'''
Simple wrapper around the normal bdist_egg command to require
protobuf build before normal build.
.. codeauthor:: jwienke
'''
def run(self):
self.run_command('build_proto')
bdist_egg.run(self)
class Build(build):
'''
Simple wrapper around the normal build command to require protobuf build
before normal build.
.. codeauthor:: jwienke
'''
def run(self):
self.run_command('build_proto')
build.run(self)
class Sdist(sdist):
'''
Simple wrapper around the normal sdist command to require protobuf build
before generating the source distribution..
.. codeauthor:: jwienke
'''
def run(self):
# fetch the protocol before building the source distribution so that
# we have a cached version and each user can rebuild the protocol
# with his own protobuf version
self.run_command('build_proto')
sdist.run(self)
version = "0.1.3" #TODO determine correct version! ideally from git, maybe do something similar to rsb/setup.py
setup(name="ipaaca",
version=version,
author="Hendrik Buschmeier, Ramin Yaghoubzadeh, Sören Klett",
author_email="hbuschme@uni-bielefeld.de,ryaghoubzadeh@uni-bielefeld.de,sklett@techfak.uni-bielefeld.de",
license='LGPLv3+',
url='https://opensource.cit-ec.de/projects/ipaaca',
install_requires=["paho-mqtt", "six", "protobuf"],
packages=["ipaaca", "ipaaca.util"],
package_dir={"ipaaca":"src/ipaaca"},
# TODO Do we want to add ipaaca_pb2.py to the egg or as separate package?
# data_files=[("./ipaaca", ["ipaaca_pb2.py"])],
# dependency_links=[
# 'http://www.spread.org/files/'
# 'SpreadModule-1.5spread4.tgz#egg=SpreadModule-1.5spread4'],
cmdclass ={
"build_proto": ProtoBuild,
"sdist": Sdist,
"build": Build,
"bdist_egg":BDist_egg
}
)
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
# "Incremental Processing Architecture # "Incremental Processing Architecture
# for Artificial Conversational Agents". # for Artificial Conversational Agents".
# #
# Copyright (c) 2009-2013 Sociable Agents Group # Copyright (c) 2009-2022 Sociable Agents Group
# CITEC, Bielefeld University # CITEC, Bielefeld University
# #
# http://opensource.cit-ec.de/projects/ipaaca/ # http://opensource.cit-ec.de/projects/ipaaca/
...@@ -36,7 +36,7 @@ import ipaaca ...@@ -36,7 +36,7 @@ import ipaaca
def remote_change_dumper(iu, event_type, local): def remote_change_dumper(iu, event_type, local):
if local: if local:
print 'remote side '+event_type+': '+str(iu) print('remote side '+event_type+': '+str(iu))
ob = ipaaca.OutputBuffer('CoolInformerOut') ob = ipaaca.OutputBuffer('CoolInformerOut')
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
# "Incremental Processing Architecture # "Incremental Processing Architecture
# for Artificial Conversational Agents". # for Artificial Conversational Agents".
# #
# Copyright (c) 2009-2016 Social Cognitive Systems Group # Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University # CITEC, Bielefeld University
# #
# http://opensource.cit-ec.de/projects/ipaaca/ # http://opensource.cit-ec.de/projects/ipaaca/
...@@ -35,8 +35,8 @@ from __future__ import division, print_function ...@@ -35,8 +35,8 @@ from __future__ import division, print_function
import os import os
import threading import threading
import rsb #import rsb
import rsb.converter #import rsb.converter
import ipaaca.ipaaca_pb2 import ipaaca.ipaaca_pb2
import ipaaca.converter import ipaaca.converter
...@@ -46,84 +46,77 @@ from ipaaca.iu import IU, Message, IUAccessMode, IUEventType ...@@ -46,84 +46,77 @@ from ipaaca.iu import IU, Message, IUAccessMode, IUEventType
from ipaaca.misc import enable_logging, IpaacaArgumentParser from ipaaca.misc import enable_logging, IpaacaArgumentParser
from ipaaca.payload import Payload from ipaaca.payload import Payload
import ipaaca.backend
__RSB_INITIALIZER_LOCK = threading.Lock() #
__RSB_INITIALIZED = False # ipaaca.exit(int_retval)
#
from ipaaca.buffer import atexit_cleanup_function
def exit(int_retval=0):
'''For the time being, this function can be used to
circumvent any sys.exit blocks, while at the same time
cleaning up the buffers (e.g. retracting IUs).
def initialize_ipaaca_rsb_if_needed(): Call once at the end of any python script (or anywhere
"""Initialise rsb if not yet initialise. in lieu of sys.exit() / os._exit(). '''
print('ipaaca: cleaning up and exiting with code '+str(int_retval))
atexit_cleanup_function()
os._exit(int_retval)
* Register own RSB onverters. __RSB_INITIALIZER_LOCK = threading.Lock()
* Initialise RSB from enviroment variables, rsb config file, or __RSB_INITIALIZED = False
from default values for RSB trnasport, host, and port (via
ipaaca.defaults or ipaaca.misc.IpaacaArgumentParser).
"""
global __RSB_INITIALIZED
with __RSB_INITIALIZER_LOCK:
if __RSB_INITIALIZED:
return
else:
rsb.converter.registerGlobalConverter(
ipaaca.converter.IntConverter(
wireSchema="int32",
dataType=int))
rsb.converter.registerGlobalConverter(
ipaaca.converter.IUConverter(
wireSchema="ipaaca-iu",
dataType=IU))
rsb.converter.registerGlobalConverter(
ipaaca.converter.MessageConverter(
wireSchema="ipaaca-messageiu",
dataType=Message))
rsb.converter.registerGlobalConverter(
ipaaca.converter.IULinkUpdateConverter(
wireSchema="ipaaca-iu-link-update",
dataType=converter.IULinkUpdate))
rsb.converter.registerGlobalConverter(
ipaaca.converter.IUPayloadUpdateConverter(
wireSchema="ipaaca-iu-payload-update",
dataType=converter.IUPayloadUpdate))
rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca.ipaaca_pb2.IUCommission))
rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca.ipaaca_pb2.IUResendRequest))
rsb.converter.registerGlobalConverter(
rsb.converter.ProtocolBufferConverter(
messageClass=ipaaca.ipaaca_pb2.IURetraction))
if ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT is not None:
if ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT == 'spread':
os.environ['RSB_TRANSPORT_SPREAD_ENABLED'] = str(1)
os.environ['RSB_TRANSPORT_SOCKET_ENABLED'] = str(0)
elif ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT == 'socket':
os.environ['RSB_TRANSPORT_SPREAD_ENABLED'] = str(0)
os.environ['RSB_TRANSPORT_SOCKET_ENABLED'] = str(1)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_SOCKET_SERVER is not None:
os.environ['RSB_TRANSPORT_SOCKET_SERVER'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_SOCKET_SERVER)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST is not None:
os.environ['RSB_TRANSPORT_SPREAD_HOST'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST)
os.environ['RSB_TRANSPORT_SOCKET_HOST'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT is not None:
os.environ['RSB_TRANSPORT_SPREAD_PORT'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT)
os.environ['RSB_TRANSPORT_SOCKET_PORT'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT)
rsb.__defaultParticipantConfig = \ def initialize_ipaaca_rsb_if_needed():
rsb.ParticipantConfig.fromDefaultSources() """Initialise rsb if not yet initialise.
__RSB_INITIALIZED = True * Register own RSB converters.
* Initialise RSB from enviroment variables, rsb config file, or
from default values for RSB trnasport, host, and port (via
ipaaca.defaults or ipaaca.misc.IpaacaArgumentParser).
"""
global __RSB_INITIALIZED
with __RSB_INITIALIZER_LOCK:
if __RSB_INITIALIZED:
return
else:
ipaaca.converter.register_global_converter(
ipaaca.converter.IUConverter(
wireSchema="ipaaca-iu",
dataType=IU))
ipaaca.converter.register_global_converter(
ipaaca.converter.MessageConverter(
wireSchema="ipaaca-messageiu",
dataType=Message))
ipaaca.converter.register_global_converter(
ipaaca.converter.IULinkUpdateConverter(
wireSchema="ipaaca-iu-link-update",
dataType=converter.IULinkUpdate))
ipaaca.converter.register_global_converter(
ipaaca.converter.IUPayloadUpdateConverter(
wireSchema="ipaaca-iu-payload-update",
dataType=converter.IUPayloadUpdate))
if ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT is not None:
if ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT == 'spread':
os.environ['RSB_TRANSPORT_SPREAD_ENABLED'] = str(1)
os.environ['RSB_TRANSPORT_SOCKET_ENABLED'] = str(0)
elif ipaaca.defaults.IPAACA_DEFAULT_RSB_TRANSPORT == 'socket':
os.environ['RSB_TRANSPORT_SPREAD_ENABLED'] = str(0)
os.environ['RSB_TRANSPORT_SOCKET_ENABLED'] = str(1)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_SOCKET_SERVER is not None:
os.environ['RSB_TRANSPORT_SOCKET_SERVER'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_SOCKET_SERVER)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST is not None:
os.environ['RSB_TRANSPORT_SPREAD_HOST'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST)
os.environ['RSB_TRANSPORT_SOCKET_HOST'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_HOST)
if ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT is not None:
os.environ['RSB_TRANSPORT_SPREAD_PORT'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT)
os.environ['RSB_TRANSPORT_SOCKET_PORT'] = str(
ipaaca.defaults.IPAACA_DEFAULT_RSB_PORT)
#
ipaaca.backend.register_backends()
__RSB_INITIALIZED = True
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import ipaaca.converter
import threading
import uuid
import os
import time
LOGGER = ipaaca.misc.get_library_logger()
__registered_backends = {}
__backend_registration_done = False
def register_backends():
global __registered_backends
global __backend_registration_done
if not __backend_registration_done:
__backend_registration_done = True
LOGGER.debug('Registering available back-ends')
# register available backends
# mqtt
import ipaaca.backend_mqtt
be = ipaaca.backend_mqtt.create_backend()
if be is not None:
__registered_backends[be.name] = be
LOGGER.debug('Back-end '+str(be.name)+' added')
# ros
import ipaaca.backend_ros
be = ipaaca.backend_ros.create_backend()
if be is not None:
__registered_backends[be.name] = be
LOGGER.debug('Back-end '+str(be.name)+' added')
def get_default_backend():
# TODO selection mechanism / config
if not __backend_registration_done:
register_backends()
if len(__registered_backends) == 0:
raise RuntimeError('No back-ends could be initialized for ipaaca-python')
cfg = ipaaca.config.get_global_config()
preferred = cfg.get_with_default('backend', None)
if preferred is None:
k = list(__registered_backends.keys())[0]
if len(__registered_backends) > 1:
LOGGER.warning('No preferred ipaaca.backend set, returning one of several (probably the first in list)')
print('Using randomly selected back-end {}!'.format(k))
else:
if preferred in __registered_backends:
k = preferred
else:
raise ipaaca.exception.BackendInitializationError(preferred)
LOGGER.info('Back-end is '+str(k))
return __registered_backends[k]
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import collections
import ipaaca.ipaaca_pb2
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import ipaaca.converter
import ipaaca.backend
import ipaaca.config
import threading
try:
import queue
except:
import Queue as queue
import uuid
import os
import time
try:
import paho.mqtt.client as mqtt
MQTT_ENABLED = True
except:
MQTT_ENABLED = False
if not MQTT_ENABLED:
def create_backend():
return None
else:
def create_backend():
return MQTTBackend(name='mqtt')
LOGGER = ipaaca.misc.get_library_logger()
_REMOTE_SERVER_MAX_QUEUED_REQUESTS = -1 # unlimited
_REMOTE_LISTENER_MAX_QUEUED_EVENTS = 1024 # 'Full' exception if exceeded
class EventWrapper(object):
def __init__(self, data):
self.data = data
class PendingRequest(object):
'''Encapsulation of a pending remote request with
a facility to keep the requesting thread locked
until the reply or a timeout unlocks it.'''
def __init__(self, request):
self._request = request
self._event = threading.Event()
self._reply = None
self._request_uid = str(uuid.uuid4())[0:8]
def wait_for_reply(self, timeout=30.0):
wr = self._event.wait(timeout)
return None if wr is False else self._reply
def reply_with_result(self, reply):
self._reply = reply
self._event.set()
class Informer(object):
'''Informer interface, wrapping an outbound port to MQTT'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
self._mqtt_client.connect(self._host, self._port)
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._live = True
self._live_event.set()
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
pass
def publishData(self, data):
#print('Informer publishing '+str(data.__class__.__name__)+' on '+self._scope)
self._mqtt_client.publish(self._scope, ipaaca.converter.serialize(data), qos=2)
class BackgroundEventDispatcher(threading.Thread):
def __init__(self, listener):
super(BackgroundEventDispatcher, self).__init__()
self.daemon = True
self._listener = listener
def terminate(self):
self._running = False
def run(self):
self._running = True
listener = self._listener
while self._running: # auto-terminated (daemon)
event = listener._event_queue.get(block=True, timeout=None)
if event is None: return # signaled termination
#print('\033[31mDispatch '+str(event.data.__class__.__name__)+' start ...\033[m')
for handler in self._listener._handlers:
handler(event)
#print('\033[32m... dispatch '+str(event.data.__class__.__name__)+' end.\033[m')
class Listener(object):
'''Listener interface, wrapping an inbound port from MQTT'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
self._event_queue = queue.Queue(_REMOTE_LISTENER_MAX_QUEUED_EVENTS)
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_socket_open = self.mqtt_callback_on_socket_open
#self._mqtt_client.on_socket_close = self.mqtt_callback_on_socket_close
#self._mqtt_client.on_log = self.mqtt_callback_on_log
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self._dispatcher = BackgroundEventDispatcher(self)
self._dispatcher.start()
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._event_queue.put(None, block=False) # signal termination, waking queue
self._dispatcher.terminate()
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
LOGGER.debug('Connect to '+str(self._host)+':'+str(self._port))
self._mqtt_client.connect(self._host, self._port)
#def mqtt_callback_on_log(self, client, userdata, level, buf):
# print('Listener: LOG: '+str(buf))
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._mqtt_client.subscribe(self._scope, qos=2)
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
self._live = True
self._live_event.set()
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
event = EventWrapper(ipaaca.converter.deserialize(message.payload))
self._event_queue.put(event, block=False) # queue event for BackgroundEventDispatcher
def addHandler(self, handler):
self._handlers.append(handler)
#def publishData(self, data):
# self._mqtt_client.publish(self._
class LocalServer(object):
'''LocalServer interface, allowing for RPC requests to
IU functions, or reporting back success or failure.'''
def __init__(self, buffer_impl, scope, config=None):
self._buffer = buffer_impl
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_LocalServer_' + self._uuid # unused atm
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
self._mqtt_client.connect(self._host, self._port)
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._mqtt_client.subscribe(self._scope, qos=2)
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
self._live = True
self._live_event.set()
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
req = ipaaca.converter.deserialize(message.payload)
result = None
if isinstance(req, ipaaca.converter.IUPayloadUpdate):
result = self.attempt_to_apply_remote_updatePayload(req)
elif isinstance(req, ipaaca.converter.IULinkUpdate):
result = self.attempt_to_apply_remote_updateLinks(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUCommission):
result = self.attempt_to_apply_remote_commit(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUResendRequest):
result = self.attempt_to_apply_remote_resendRequest(req)
else:
raise RuntimeError('LocalServer: got an object of wrong class '+str(req.__class__.__name__)) # TODO replace
if result is not None:
self.send_result_for_request(req, result)
#
def send_result_for_request(self, obj, result):
pbo = ipaaca.ipaaca_pb2.RemoteRequestResult()
pbo.result = result
pbo.request_uid = obj.request_uid
#print('Sending result to endpoint '+str(obj.request_endpoint))
self._mqtt_client.publish(obj.request_endpoint, ipaaca.converter.serialize(pbo), qos=2)
def attempt_to_apply_remote_updateLinks(self, obj):
return self._buffer._remote_update_links(obj)
def attempt_to_apply_remote_updatePayload(self, obj):
return self._buffer._remote_update_payload(obj)
def attempt_to_apply_remote_commit(self, obj):
return self._buffer._remote_commit(obj)
def attempt_to_apply_remote_resendRequest(self, obj):
return self._buffer._remote_request_resend(obj)
class RemoteServer(object):
'''RemoteServer, connects to a LocalServer on the side
of an actual IU owner, which will process any requests.
The RemoteServer is put on hold while the owner is
processing. RemoteServer is from RSB terminology,
it might more aptly be described as an RPC client.'''
def __init__(self, remote_end_scope, config=None):
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
#queue.Queue(_REMOTE_SERVER_MAX_QUEUED_REQUESTS)
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_RemoteServer_' + self._uuid
# will RECV here:
self._scope = '/ipaaca/remotes/' + self._name
# will SEND here
self._remote_end_scope = remote_end_scope
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + remote_end_scope
self._mqtt_client = mqtt.Client(self._client_id)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = int(config.get_with_default('transport.mqtt.port', 1883, warn=True))
self._mqtt_client.on_connect = self.mqtt_callback_on_connect
self._mqtt_client.on_disconnect = self.mqtt_callback_on_disconnect
self._mqtt_client.on_message = self.mqtt_callback_on_message
self._mqtt_client.on_subscribe = self.mqtt_callback_on_subscribe
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self.run_in_background()
def deactivate(self):
pass
def deactivate_internal(self):
self._mqtt_client.disconnect()
self._mqtt_client = None
def run_in_background(self):
if not self._running:
self._running = True
self._mqtt_client.loop_start()
self._mqtt_client.connect(self._host, self._port)
def mqtt_callback_on_connect(self, client, userdata, flags, rc):
if rc > 0:
LOGGER.warning('MQTT connect failed, result code ' + str(rc))
else:
self._mqtt_client.subscribe(self._scope, qos=2)
def mqtt_callback_on_subscribe(self, client, userdata, mid, granted_qos):
# TODO should / could track how many / which topics have been granted
if any(q != 2 for q in granted_qos):
LOGGER.warning('MQTT subscription did not obtain QoS level 2')
self._live = True
self._live_event.set()
def mqtt_callback_on_disconnect(self, client, userdata, rc):
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message):
reply = ipaaca.converter.deserialize(message.payload)
if isinstance(reply, ipaaca.ipaaca_pb2.RemoteRequestResult):
uid = reply.request_uid
pending_request = None
with self._pending_requests_lock:
if uid in self._pending_requests:
pending_request = self._pending_requests[uid]
del self._pending_requests[uid]
if pending_request is None:
raise RuntimeError('RemoteServer: got a reply for request uid that is not queued: '+str(uid))
else:
# provide result to other thread and unblock it
pending_request.reply_with_result(reply)
else:
raise RuntimeError('RemoteServer: got an object of wrong class '+str(reply.__class__.__name__)) # TODO replace
def queue_pending_request(self, request):
pending_request = PendingRequest(request)
with self._pending_requests_lock:
if _REMOTE_SERVER_MAX_QUEUED_REQUESTS>0 and len(self._pending_requests) >= _REMOTE_SERVER_MAX_QUEUED_REQUESTS:
raise RuntimeError('RemoteServer: maximum number of pending requests exceeded') # TODO replace?
else:
self._pending_requests[pending_request._request_uid] = pending_request
return pending_request
# impl
def blocking_call(self, request):
# Broker's queue will raise before sending anything if capacity is exceeded
pending_request = self.queue_pending_request(request)
# complete and send request
request.request_uid = pending_request._request_uid
request.request_endpoint = self._scope
self._mqtt_client.publish(self._remote_end_scope, ipaaca.converter.serialize(request), qos=2)
# wait for other end to return result
reply = pending_request.wait_for_reply()
if reply is None:
LOGGER.warning('A request timed out!')
return 0
else:
return reply.result # the actual int result
# glue that quacks like the RSB version
def resendRequest(self, req):
return self.blocking_call(req)
def commit(self, req):
return self.blocking_call(req)
def updatePayload(self, req):
return self.blocking_call(req)
def updateLinks(self, req):
return self.blocking_call(req)
class MQTTBackend(object):
def __init__(self, name='mqtt'):
# back-end initialization code
self._config = ipaaca.config.get_global_config()
self._name = name
self._participants = set()
def _get_name(self):
return self._name
name = property(_get_name)
def teardown(self):
LOGGER.info('MQTT teardown: waiting 1 sec for final deliveries')
time.sleep(1)
for p in self._participants:
p.deactivate_internal()
def Scope(self, scope_str):
'''Scope adapter (glue replacing rsb.Scope)'''
return str(scope_str)
def createLocalServer(self, buffer_impl, scope, config=None):
LOGGER.debug('Creating a LocalServer on '+str(scope))
s = LocalServer(buffer_impl, scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
def createRemoteServer(self, scope, config=None):
LOGGER.debug('Creating a RemoteServer on '+str(scope))
s = RemoteServer(scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
def createInformer(self, scope, config=None, dataType="ignored in this backend"):
LOGGER.debug('Creating an Informer on '+str(scope))
s = Informer(scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
def createListener(self, scope, config=None):
LOGGER.debug('Creating a Listener on '+str(scope))
s = Listener(scope, self._config if config is None else config)
self._participants.add(s)
s._live_event.wait(30.0)
return s
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import collections
import sys
import ipaaca.ipaaca_pb2
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import ipaaca.converter
import ipaaca.backend
import ipaaca.config as config
LOGGER = ipaaca.misc.get_library_logger()
ROS_ENABLED, __try_guessing = False, False
try:
import rospy
from std_msgs.msg import String
import base64
ROS_ENABLED = True
except:
LOGGER.debug('rospy or deps not found, ROS backend disabled')
ROS_ENABLED = False
if not ROS_ENABLED:
def create_backend():
return None
else:
def create_backend():
return ROSBackend(name='ros')
import threading
try:
import queue
except:
import Queue as queue
import uuid
import os
import time
import sys
class EventWrapper(object):
def __init__(self, data):
self.data = data
class PendingRequest(object):
'''Encapsulation of a pending remote request with
a facility to keep the requesting thread locked
until the reply or a timeout unlocks it.'''
def __init__(self, request):
self._request = request
self._event = threading.Event()
self._reply = None
self._request_uid = str(uuid.uuid4())[0:8]
def wait_for_reply(self, timeout=30.0):
wr = self._event.wait(timeout)
return None if wr is False else self._reply
def reply_with_result(self, reply):
self._reply = reply
self._event.set()
class Informer(object):
'''Informer interface, wrapping an outbound port to ROS'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._ros_pub = rospy.Publisher(self._scope, String, queue_size=100, tcp_nodelay=True, latch=True)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = config.get_with_default('transport.mqtt.port', 1883, warn=True)
def deactivate(self):
pass
#self._ros_pub.unregister()
#self._ros_pub = None
def publishData(self, data):
self._ros_pub.publish(ROSBackend.serialize(data))
class BackgroundEventDispatcher(threading.Thread):
def __init__(self, event, handlers):
super(BackgroundEventDispatcher, self).__init__()
self.daemon = True
self._event = event
self._handlers = handlers
def run(self):
for handler in self._handlers:
handler(self._event)
class Listener(object):
'''Listener interface, wrapping an inbound port from ROS'''
def __init__(self, scope, config=None):
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._handlers = []
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._ros_sub = rospy.Subscriber(self._scope, String, self.on_message, tcp_nodelay=True)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = config.get_with_default('transport.mqtt.port', 1883, warn=True)
def deactivate(self):
pass
#self._ros_sub.unregister()
#self._ros_sub = None
def on_message(self, message):
event = EventWrapper(ROSBackend.deserialize(message.data))
## (1) with extra thread:
#dispatcher = BackgroundEventDispatcher(event, self._handlers)
#dispatcher.start()
## or (2) with no extra thread:
for handler in self._handlers:
handler(event)
def addHandler(self, handler):
self._handlers.append(handler)
class LocalServer(object):
'''LocalServer interface, allowing for RPC requests to
IU functions, or reporting back success or failure.'''
def __init__(self, buffer_impl, scope, config=None):
self._buffer = buffer_impl
self._scope = scope
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_LocalServer_' + self._uuid # unused atm
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope
self._ros_pubs = {}
self._ros_sub = rospy.Subscriber(self._scope, String, self.on_message, tcp_nodelay=True)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = config.get_with_default('transport.mqtt.port', 1883, warn=True)
def get_publisher(self, endpoint):
if endpoint in self._ros_pubs:
return self._ros_pubs[endpoint]
else:
p = rospy.Publisher(endpoint, String, queue_size=10, tcp_nodelay=True, latch=True)
self._ros_pubs[endpoint] = p
return p
def deactivate(self):
pass
#self._ros_sub.unregister()
#for v in self._ros_pubs.values():
# v.unregister()
#self._ros_sub = None
#self._ros_pubs = {}
def on_message(self, message):
req = ROSBackend.deserialize(message.data)
result = None
if isinstance(req, ipaaca.converter.IUPayloadUpdate):
result = self.attempt_to_apply_remote_updatePayload(req)
elif isinstance(req, ipaaca.converter.IULinkUpdate):
result = self.attempt_to_apply_remote_updateLinks(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUCommission):
result = self.attempt_to_apply_remote_commit(req)
elif isinstance(req, ipaaca.ipaaca_pb2.IUResendRequest):
result = self.attempt_to_apply_remote_resendRequest(req)
else:
raise RuntimeError('LocalServer: got an object of wrong class '+str(req.__class__.__name__)) # TODO replace
if result is not None:
self.send_result_for_request(req, result)
#
def send_result_for_request(self, obj, result):
pbo = ipaaca.ipaaca_pb2.RemoteRequestResult()
pbo.result = result
pbo.request_uid = obj.request_uid
#print('Sending result to endpoint '+str(obj.request_endpoint))
pub = self.get_publisher(obj.request_endpoint)
pub.publish(ROSBackend.serialize(pbo))
def attempt_to_apply_remote_updateLinks(self, obj):
return self._buffer._remote_update_links(obj)
def attempt_to_apply_remote_updatePayload(self, obj):
return self._buffer._remote_update_payload(obj)
def attempt_to_apply_remote_commit(self, obj):
return self._buffer._remote_commit(obj)
def attempt_to_apply_remote_resendRequest(self, obj):
return self._buffer._remote_request_resend(obj)
_REMOTE_SERVER_MAX_QUEUED_REQUESTS = -1 # unlimited
class RemoteServer(object):
'''RemoteServer, connects to a LocalServer on the side
of an actual IU owner, which will process any requests.
The RemoteServer is put on hold while the owner is
processing. RemoteServer is from RSB terminology,
it might more aptly be described as an RPC client.'''
def __init__(self, remote_end_scope, config=None):
self._running = False
self._live = False
self._live_event = threading.Event()
self._pending_requests_lock = threading.Lock()
self._pending_requests = {}
#queue.Queue(_REMOTE_SERVER_MAX_QUEUED_REQUESTS)
self._uuid = str(uuid.uuid4())[0:8]
self._name = 'PID_' + str(os.getpid()) + '_RemoteServer_' + self._uuid
# will RECV here:
self._scope = '/ipaaca/remotes/' + self._name
# will SEND here
self._remote_end_scope = remote_end_scope
#
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + remote_end_scope
self._ros_pub = rospy.Publisher(self._remote_end_scope, String, queue_size=10, tcp_nodelay=True, latch=True)
self._ros_sub = rospy.Subscriber(self._scope, String, self.on_message, tcp_nodelay=True)
self._host = config.get_with_default('transport.mqtt.host', 'localhost', warn=True)
self._port = config.get_with_default('transport.mqtt.port', 1883, warn=True)
def deactivate(self):
pass
#self._ros_sub.unregister()
#self._ros_pub.unregister()
#self._ros_sub = None
#self._ros_pub = None
def on_message(self, message):
reply = ROSBackend.deserialize(message.data)
if isinstance(reply, ipaaca.ipaaca_pb2.RemoteRequestResult):
uid = reply.request_uid
pending_request = None
with self._pending_requests_lock:
if uid in self._pending_requests:
pending_request = self._pending_requests[uid]
del self._pending_requests[uid]
if pending_request is None:
raise RuntimeError('RemoteServer: got a reply for request uid that is not queued: '+str(uid))
else:
# provide result to other thread and unblock it
pending_request.reply_with_result(reply)
else:
raise RuntimeError('RemoteServer: got an object of wrong class '+str(reply.__class__.__name__)) # TODO replace
def queue_pending_request(self, request):
pending_request = PendingRequest(request)
with self._pending_requests_lock:
if _REMOTE_SERVER_MAX_QUEUED_REQUESTS>0 and len(self._pending_requests) >= _REMOTE_SERVER_MAX_QUEUED_REQUESTS:
raise RuntimeError('RemoteServer: maximum number of pending requests exceeded') # TODO replace?
else:
self._pending_requests[pending_request._request_uid] = pending_request
return pending_request
# impl
def blocking_call(self, request):
# Broker's queue will raise before sending anything if capacity is exceeded
pending_request = self.queue_pending_request(request)
# complete and send request
request.request_uid = pending_request._request_uid
request.request_endpoint = self._scope
self._ros_pub.publish(ROSBackend.serialize(request))
# wait for other end to return result
reply = pending_request.wait_for_reply()
if reply is None:
LOGGER.warning('A request timed out!')
return 0
else:
return reply.result # the actual int result
# glue that quacks like the RSB version
def resendRequest(self, req):
return self.blocking_call(req)
def commit(self, req):
return self.blocking_call(req)
def updatePayload(self, req):
return self.blocking_call(req)
def updateLinks(self, req):
return self.blocking_call(req)
class ROSBackend(object):
def __init__(self, name='ros'):
#import logging
# back-end initialization code
self._name = name
self._need_init = True
#logging.basicConfig(level=logging.DEBUG)
def init_once(self):
'''Actual back-end initialization is only done when it is used'''
if self._need_init:
self._need_init = False
self._config = config.get_global_config()
try:
# generate a ROS node prefix from the basename of argv[0]
clean_name = ''.join([c for c in sys.argv[0].rsplit('/',1)[-1].replace('.', '_').replace('-','_') if c.lower() in 'abcdefghijklmnoprqstuvwxzy0123456789_'])
except:
clean_name = ''
rospy.init_node('ipaaca_python' if len(clean_name)==0 else clean_name,
anonymous=True, disable_signals=True)
def _get_name(self):
return self._name
name = property(_get_name)
def teardown(self):
LOGGER.info('ROS teardown: waiting 1 sec for final deliveries')
time.sleep(1)
rospy.signal_shutdown('Done')
@staticmethod
def serialize(obj):
#print('object class: '+obj.__class__.__name__)
bb = ipaaca.converter.serialize(obj)
st = str(base64.b64encode(bb))
#print('serialized: '+str(st))
return st
@staticmethod
def deserialize(msg):
#print('got serialized: '+str(msg))
bb = base64.b64decode(msg)
return ipaaca.converter.deserialize(bb)
def Scope(self, scope_str):
'''Scope adapter (glue replacing rsb.Scope)'''
# ROS graph resources must not start with a slash
return str(scope_str)[1:] if scope_str.startswith('/') else str(scope_str)
def createLocalServer(self, buffer_impl, scope, config=None):
self.init_once()
LOGGER.debug('Creating a LocalServer on '+str(scope))
LOGGER.debug(' from thread '+threading.current_thread().name)
s = LocalServer(buffer_impl, scope, self._config if config is None else config)
#s._live_event.wait(30.0)
return s
def createRemoteServer(self, scope, config=None):
self.init_once()
LOGGER.debug('Creating a RemoteServer on '+str(scope))
LOGGER.debug(' from thread '+threading.current_thread().name)
s = RemoteServer(scope, self._config if config is None else config)
#s._live_event.wait(30.0)
return s
def createInformer(self, scope, config=None, dataType="ignored in this backend"):
self.init_once()
LOGGER.debug('Creating an Informer on '+str(scope))
LOGGER.debug(' from thread '+threading.current_thread().name)
s = Informer(scope, self._config if config is None else config)
#s._live_event.wait(30.0)
return s
def createListener(self, scope, config=None):
self.init_once()
LOGGER.debug('Creating a Listener on '+str(scope))
LOGGER.debug(' from thread '+threading.current_thread().name)
s = Listener(scope, self._config if config is None else config)
#s._live_event.wait(30.0)
return s
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
# "Incremental Processing Architecture # "Incremental Processing Architecture
# for Artificial Conversational Agents". # for Artificial Conversational Agents".
# #
# Copyright (c) 2009-2015 Social Cognitive Systems Group # Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University # CITEC, Bielefeld University
# #
# http://opensource.cit-ec.de/projects/ipaaca/ # http://opensource.cit-ec.de/projects/ipaaca/
...@@ -37,17 +37,18 @@ import threading ...@@ -37,17 +37,18 @@ import threading
import uuid import uuid
import traceback import traceback
import six
import weakref import weakref
import atexit import atexit
import rsb #import rsb
import ipaaca.ipaaca_pb2 import ipaaca.ipaaca_pb2
import ipaaca.defaults import ipaaca.defaults
import ipaaca.exception import ipaaca.exception
import ipaaca.converter import ipaaca.converter
import ipaaca.iu import ipaaca.iu
import ipaaca.backend
__all__ = [ __all__ = [
'InputBuffer', 'InputBuffer',
...@@ -65,6 +66,7 @@ def atexit_cleanup_function(): ...@@ -65,6 +66,7 @@ def atexit_cleanup_function():
obj = obj_r() obj = obj_r()
if obj is not None: # if weakref still valid if obj is not None: # if weakref still valid
obj._teardown() obj._teardown()
ipaaca.backend.get_default_backend().teardown()
atexit.register(atexit_cleanup_function) atexit.register(atexit_cleanup_function)
def auto_teardown_instances(fn): def auto_teardown_instances(fn):
...@@ -112,10 +114,10 @@ class IUEventHandler(object): ...@@ -112,10 +114,10 @@ class IUEventHandler(object):
self._handler_function = handler_function self._handler_function = handler_function
self._for_event_types = ( self._for_event_types = (
None if for_event_types is None else None if for_event_types is None else
(for_event_types[:] if hasattr(for_event_types, '__iter__') else [for_event_types])) (for_event_types[:] if not isinstance(for_event_types, six.string_types) and hasattr(for_event_types, '__iter__') else [for_event_types]))
self._for_categories = ( self._for_categories = (
None if for_categories is None else None if for_categories is None else
(for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories])) (for_categories[:] if not isinstance(for_categories, six.string_types) and hasattr(for_categories, '__iter__') else [for_categories]))
def condition_met(self, event_type, category): def condition_met(self, event_type, category):
"""Check whether this IUEventHandler should be called. """Check whether this IUEventHandler should be called.
...@@ -158,7 +160,7 @@ class Buffer(object): ...@@ -158,7 +160,7 @@ class Buffer(object):
ipaaca.initialize_ipaaca_rsb_if_needed() ipaaca.initialize_ipaaca_rsb_if_needed()
self._owning_component_name = owning_component_name self._owning_component_name = owning_component_name
self._channel = channel if channel is not None else ipaaca.defaults.IPAACA_DEFAULT_CHANNEL self._channel = channel if channel is not None else ipaaca.defaults.IPAACA_DEFAULT_CHANNEL
self._participant_config = rsb.ParticipantConfig.fromDefaultSources() if participant_config is None else participant_config self._participant_config = participant_config
self._uuid = str(uuid.uuid4())[0:8] self._uuid = str(uuid.uuid4())[0:8]
# Initialise with a temporary, but already unique, name # Initialise with a temporary, but already unique, name
self._unique_name = "undef-"+self._uuid self._unique_name = "undef-"+self._uuid
...@@ -169,6 +171,12 @@ class Buffer(object): ...@@ -169,6 +171,12 @@ class Buffer(object):
return FrozenIUStore(original_iu_store = self._iu_store) return FrozenIUStore(original_iu_store = self._iu_store)
iu_store = property(fget=_get_frozen_iu_store, doc='Copy-on-read version of the internal IU store') iu_store = property(fget=_get_frozen_iu_store, doc='Copy-on-read version of the internal IU store')
def _get_channel(self):
return self._channel
channel = property(
fget=_get_channel,
doc='The IPAACA channel the buffer is connected to.')
def register_handler(self, handler_function, for_event_types=None, for_categories=None): def register_handler(self, handler_function, for_event_types=None, for_categories=None):
"""Register a new IU event handler function. """Register a new IU event handler function.
...@@ -192,9 +200,9 @@ class Buffer(object): ...@@ -192,9 +200,9 @@ class Buffer(object):
h.call(self, uid, local=local, event_type=event_type, category=category) h.call(self, uid, local=local, event_type=event_type, category=category)
except Exception as e: except Exception as e:
if local: if local:
LOGGER.error('Local IU handler raised an exception upon remote write.' + unicode(e)) LOGGER.error('Local IU handler raised an exception upon remote write.' + str(e))
else: else:
print(unicode(traceback.format_exc())) print(str(traceback.format_exc()))
raise e raise e
def _get_owning_component_name(self): def _get_owning_component_name(self):
...@@ -234,13 +242,14 @@ class InputBuffer(Buffer): ...@@ -234,13 +242,14 @@ class InputBuffer(Buffer):
def _get_remote_server(self, event_or_iu): def _get_remote_server(self, event_or_iu):
'''Return (or create, store and return) a remote server.''' '''Return (or create, store and return) a remote server.'''
_owner = self._get_owner(event_or_iu) _remote_server_name = self._get_owner(event_or_iu) + '/Server'
if _owner: if _remote_server_name:
try: try:
return self._remote_server_store[_owner] return self._remote_server_store[_remote_server_name]
except KeyError: except KeyError:
remote_server = rsb.createRemoteServer(rsb.Scope(str(_owner))) be = ipaaca.backend.get_default_backend()
self._remote_server_store[_owner] = remote_server remote_server = be.createRemoteServer(be.Scope(str(_remote_server_name)), config=self._participant_config)
self._remote_server_store[_remote_server_name] = remote_server
return remote_server return remote_server
else: else:
None None
...@@ -262,7 +271,8 @@ class InputBuffer(Buffer): ...@@ -262,7 +271,8 @@ class InputBuffer(Buffer):
def _add_category_listener(self, iu_category): def _add_category_listener(self, iu_category):
'''Create and store a listener on a specific category.''' '''Create and store a listener on a specific category.'''
if iu_category not in self._listener_store: if iu_category not in self._listener_store:
cat_listener = rsb.createListener(rsb.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)), config=self._participant_config) be = ipaaca.backend.get_default_backend()
cat_listener = be.createListener(be.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)), config=self._participant_config)
cat_listener.addHandler(self._handle_iu_events) cat_listener.addHandler(self._handle_iu_events)
self._listener_store[iu_category] = cat_listener self._listener_store[iu_category] = cat_listener
self._category_interests.append(iu_category) self._category_interests.append(iu_category)
...@@ -303,7 +313,7 @@ class InputBuffer(Buffer): ...@@ -303,7 +313,7 @@ class InputBuffer(Buffer):
event -- a converted RSB event event -- a converted RSB event
''' '''
type_ = type(event.data) type_ = type(event.data)
if type_ is ipaaca.iu.RemotePushIU: if type_ == ipaaca.iu.RemotePushIU:
# a new IU # a new IU
if event.data.uid not in self._iu_store: if event.data.uid not in self._iu_store:
self._iu_store[event.data.uid] = event.data self._iu_store[event.data.uid] = event.data
...@@ -316,7 +326,7 @@ class InputBuffer(Buffer): ...@@ -316,7 +326,7 @@ class InputBuffer(Buffer):
# done via the resend request mechanism). # done via the resend request mechanism).
self._iu_store[event.data.uid] = event.data self._iu_store[event.data.uid] = event.data
event.data.buffer = self event.data.buffer = self
elif type_ is ipaaca.iu.RemoteMessage: elif type_ == ipaaca.iu.RemoteMessage:
# a new Message, an ephemeral IU that is removed after calling handlers # a new Message, an ephemeral IU that is removed after calling handlers
self._iu_store[ event.data.uid ] = event.data self._iu_store[ event.data.uid ] = event.data
event.data.buffer = self event.data.buffer = self
...@@ -325,7 +335,7 @@ class InputBuffer(Buffer): ...@@ -325,7 +335,7 @@ class InputBuffer(Buffer):
else: else:
if event.data.uid not in self._iu_store: if event.data.uid not in self._iu_store:
if (self._resend_active and if (self._resend_active and
not type_ is ipaaca.ipaaca_pb2.IURetraction): not type_ == ipaaca.ipaaca_pb2.IURetraction):
# send resend request to remote server, IURetraction is ignored # send resend request to remote server, IURetraction is ignored
try: try:
self._request_remote_resend(event) self._request_remote_resend(event)
...@@ -336,7 +346,7 @@ class InputBuffer(Buffer): ...@@ -336,7 +346,7 @@ class InputBuffer(Buffer):
LOGGER.warning("Received an update for an IU which we did not receive before.") LOGGER.warning("Received an update for an IU which we did not receive before.")
return return
# an update to an existing IU # an update to an existing IU
if type_ is ipaaca.ipaaca_pb2.IURetraction: if type_ == ipaaca.ipaaca_pb2.IURetraction:
# IU retraction (cannot be triggered remotely) # IU retraction (cannot be triggered remotely)
iu = self._iu_store[event.data.uid] iu = self._iu_store[event.data.uid]
iu._revision = event.data.revision iu._revision = event.data.revision
...@@ -347,18 +357,18 @@ class InputBuffer(Buffer): ...@@ -347,18 +357,18 @@ class InputBuffer(Buffer):
# Notify only for remotely triggered events; # Notify only for remotely triggered events;
# Discard updates that originate from this buffer # Discard updates that originate from this buffer
return return
if type_ is ipaaca.ipaaca_pb2.IUCommission: if type_ == ipaaca.ipaaca_pb2.IUCommission:
# IU commit # IU commit
iu = self._iu_store[event.data.uid] iu = self._iu_store[event.data.uid]
iu._apply_commission() iu._apply_commission()
iu._revision = event.data.revision iu._revision = event.data.revision
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.COMMITTED, category=iu.category) self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.COMMITTED, category=iu.category)
elif type_ is ipaaca.converter.IUPayloadUpdate: elif type_ == ipaaca.converter.IUPayloadUpdate:
# IU payload update # IU payload update
iu = self._iu_store[event.data.uid] iu = self._iu_store[event.data.uid]
iu._apply_update(event.data) iu._apply_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.UPDATED, category=iu.category) self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.UPDATED, category=iu.category)
elif type_ is ipaaca.converter.IULinkUpdate: elif type_ == ipaaca.converter.IULinkUpdate:
# IU link update # IU link update
iu = self._iu_store[event.data.uid] iu = self._iu_store[event.data.uid]
iu._apply_link_update(event.data) iu._apply_link_update(event.data)
...@@ -367,14 +377,14 @@ class InputBuffer(Buffer): ...@@ -367,14 +377,14 @@ class InputBuffer(Buffer):
LOGGER.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_)) LOGGER.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_))
def add_category_interests(self, category_interests): def add_category_interests(self, category_interests):
if hasattr(category_interests, '__iter__'): if not isinstance(category_interests, six.string_types) and hasattr(category_interests, '__iter__'):
for interest in category_interests: for interest in category_interests:
self._add_category_listener(interest) self._add_category_listener(interest)
else: else:
self._add_category_listener(category_interests) self._add_category_listener(category_interests)
def remove_category_interests(self, category_interests): def remove_category_interests(self, category_interests):
if hasattr(category_interests, '__iter__'): if not isinstance(category_interests, six.string_types) and hasattr(category_interests, '__iter__'):
for interest in category_interests: for interest in category_interests:
self._remove_category_listener(interest) self._remove_category_listener(interest)
else: else:
...@@ -386,12 +396,12 @@ class InputBuffer(Buffer): ...@@ -386,12 +396,12 @@ class InputBuffer(Buffer):
resend_request = ipaaca.ipaaca_pb2.IUResendRequest() resend_request = ipaaca.ipaaca_pb2.IUResendRequest()
resend_request.uid = event.data.uid # target iu resend_request.uid = event.data.uid # target iu
resend_request.hidden_scope_name = str(self._uuid) # hidden category name resend_request.hidden_scope_name = str(self._uuid) # hidden category name
remote_revision = remote_server.requestResend(resend_request) remote_revision = remote_server.resendRequest(resend_request)
if remote_revision == 0: if remote_revision == 0:
raise ipaaca.exception.IUResendRequestFailedError() raise ipaaca.exception.IUResendRequestFailedError(event.data.uid)
else: else:
# Remote server is not known # Remote server is not known
raise ipaaca.exception.IUResendRequestFailedError() raise ipaaca.exception.IUResendRequestRemoteServerUnknownError(event.data.uid)
def register_handler(self, handler_function, for_event_types=None, for_categories=None): def register_handler(self, handler_function, for_event_types=None, for_categories=None):
"""Register a new IU event handler function. """Register a new IU event handler function.
...@@ -433,11 +443,8 @@ class OutputBuffer(Buffer): ...@@ -433,11 +443,8 @@ class OutputBuffer(Buffer):
''' '''
super(OutputBuffer, self).__init__(owning_component_name, channel, participant_config) super(OutputBuffer, self).__init__(owning_component_name, channel, participant_config)
self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB' self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB'
self._server = rsb.createLocalServer(rsb.Scope(self._unique_name)) be = ipaaca.backend.get_default_backend()
self._server.addMethod('updateLinks', self._remote_update_links, ipaaca.converter.IULinkUpdate, int) self._server = be.createLocalServer(self, be.Scope(self._unique_name + '/Server'), config=self._participant_config)
self._server.addMethod('updatePayload', self._remote_update_payload, ipaaca.converter.IUPayloadUpdate, int)
self._server.addMethod('commit', self._remote_commit, ipaaca.ipaaca_pb2.IUCommission, int)
self._server.addMethod('requestResend', self._remote_request_resend, ipaaca.ipaaca_pb2.IUResendRequest, int)
self._informer_store = {} self._informer_store = {}
self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-' self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-'
self.__iu_id_counter_lock = threading.Lock() self.__iu_id_counter_lock = threading.Lock()
...@@ -482,10 +489,10 @@ class OutputBuffer(Buffer): ...@@ -482,10 +489,10 @@ class OutputBuffer(Buffer):
with iu.revision_lock: with iu.revision_lock:
if (update.revision != 0) and (update.revision != iu.revision): if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update") # (0 means "do not pay attention to the revision number" -> "force update")
LOGGER.warning(u"Remote update_payload operation failed because request was out of date; IU "+str(update.uid)) LOGGER.warning("Remote update_payload operation failed because request was out of date; IU "+str(update.uid))
LOGGER.warning(u" Writer was: "+update.writer_name) LOGGER.warning(" Writer was: "+update.writer_name)
LOGGER.warning(u" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove)) LOGGER.warning(" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove))
LOGGER.warning(u" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision)) LOGGER.warning(" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision))
return 0 return 0
if update.is_delta: if update.is_delta:
#print('Writing delta update by '+str(update.writer_name)) #print('Writing delta update by '+str(update.writer_name))
...@@ -508,7 +515,7 @@ class OutputBuffer(Buffer): ...@@ -508,7 +515,7 @@ class OutputBuffer(Buffer):
return 0 return 0
iu = self._iu_store[iu_resend_request_pack.uid] iu = self._iu_store[iu_resend_request_pack.uid]
with iu.revision_lock: with iu.revision_lock:
if iu_resend_request_pack.hidden_scope_name is not None and iu_resend_request_pack.hidden_scope_name is not '': if iu_resend_request_pack.hidden_scope_name is not None and iu_resend_request_pack.hidden_scope_name != '':
informer = self._get_informer(iu_resend_request_pack.hidden_scope_name) informer = self._get_informer(iu_resend_request_pack.hidden_scope_name)
informer.publishData(iu) informer.publishData(iu)
return iu.revision return iu.revision
...@@ -538,8 +545,9 @@ class OutputBuffer(Buffer): ...@@ -538,8 +545,9 @@ class OutputBuffer(Buffer):
if iu_category in self._informer_store: if iu_category in self._informer_store:
LOGGER.info("Returning informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)) LOGGER.info("Returning informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category))
return self._informer_store[iu_category] return self._informer_store[iu_category]
informer_iu = rsb.createInformer( be = ipaaca.backend.get_default_backend()
rsb.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)), informer_iu = be.createInformer(
be.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)),
config=self._participant_config, config=self._participant_config,
dataType=object) dataType=object)
self._informer_store[iu_category] = informer_iu #new_tuple self._informer_store[iu_category] = informer_iu #new_tuple
......
# -*- coding: utf-8 -*-
# This file is part of IPAACA, the
# "Incremental Processing Architecture
# for Artificial Conversational Agents".
#
# Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University
#
# http://opensource.cit-ec.de/projects/ipaaca/
# http://purl.org/net/ipaaca
#
# This file may be licensed under the terms of of the
# GNU Lesser General Public License Version 3 (the ``LGPL''),
# or (at your option) any later version.
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the LGPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the LGPL along with this
# program. If not, go to http://www.gnu.org/licenses/lgpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# The development of this software was supported by the
# Excellence Cluster EXC 277 Cognitive Interaction Technology.
# The Excellence Cluster EXC 277 is a grant of the Deutsche
# Forschungsgemeinschaft (DFG) in the context of the German
# Excellence Initiative.
from __future__ import division, print_function
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
import ipaaca.misc
import os
import re
try:
import configparser
except:
import ConfigParser as configparser
LOGGER = ipaaca.misc.get_library_logger()
__global_config = None
class Config(object):
def __init__(self):
self._store = {}
def get_with_default(self, key, default_value, warn=False):
if key in self._store:
return self._store[key]
else:
notif = LOGGER.warning if warn else LOGGER.debug
notif('Config key '+str(key)+' not found, returning default of '+str(default_value))
return default_value
def populate_from_global_sources(self):
self._store = {}
self.populate_from_any_conf_files()
self.populate_from_environment()
#self.populate_from_argv_overrides() # TODO IMPLEMENT_ME
def populate_from_any_conf_files(self):
globalconf = os.getenv('HOME', '')+'/.config/ipaaca.conf'
for filename in ['ipaaca.conf', globalconf]:
try:
f = open(filename, 'r')
c = configparser.ConfigParser()
c.readfp(f)
f.close()
LOGGER.info('Including configuration from '+filename)
for k,v in c.items('ipaaca'):
self._store[k] = v
return
except:
pass
LOGGER.info('Could not load ipaaca.conf either here or in ~/.config')
def populate_from_environment(self):
for k, v in os.environ.items():
if k.startswith('IPAACA_'):
if re.match(r'^[A-Za-z0-9_]*$', k) is None:
LOGGER.warning('Ignoring malformed environment key')
else:
if len(v)>1023:
LOGGER.warning('Ignoring long environment value')
else:
# remove initial IPAACA_ and transform key to dotted lowercase
trans_key = k[7:].lower().replace('_', '.')
self._store[trans_key] = v
LOGGER.debug('Configured from environment: '+str(trans_key)+'="'+str(v)+'"')
def get_global_config():
global __global_config
if __global_config is None:
__global_config = Config()
__global_config.populate_from_global_sources()
return __global_config