Skip to content
Snippets Groups Projects
Commit d475a45f authored by Hendrik Buschmeier's avatar Hendrik Buschmeier
Browse files

Made ipaaca-java compatible with ipaaca-python's JSON datatypes. No JSON...

Made ipaaca-java compatible with ipaaca-python's JSON datatypes. No JSON support on the Java-side though.
parent 2ae5cc1b
No related branches found
No related tags found
No related merge requests found
......@@ -231,7 +231,7 @@ public abstract class AbstractIU
List<PayloadItem> items = new ArrayList<PayloadItem>();
for (Entry<String, String> entry : newPayload.entrySet())
{
PayloadItem item = PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("") // TODO:default type?
PayloadItem item = PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("str") // TODO:default type?
.build();
items.add(item);
}
......
......@@ -79,7 +79,7 @@ public class IUConverter implements Converter<ByteBuffer>
List<PayloadItem> payloadItems = new ArrayList<PayloadItem>();
for (Entry<String, String> entry : iua.getPayload().entrySet())
{
payloadItems.add(PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("").build());
payloadItems.add(PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("str").build());
}
List<LinkSet> links = new ArrayList<LinkSet>();
......
......@@ -36,6 +36,7 @@ import ipaaca.protobuf.Ipaaca.IUCommission;
import ipaaca.protobuf.Ipaaca.IUResendRequest;
import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
import ipaaca.protobuf.Ipaaca.PayloadItem;
import java.util.Collection;
import java.util.HashMap;
......@@ -52,8 +53,10 @@ import rsb.Event;
import rsb.Factory;
import rsb.Handler;
import rsb.InitializeException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import rsb.Listener;
import rsb.RSBException;
import rsb.Scope;
......@@ -291,7 +294,7 @@ public class InputBuffer extends Buffer
}
}
// def _handle_iu_events(self, event):
// '''Dispatch incoming IU events.
//
......@@ -342,7 +345,6 @@ public class InputBuffer extends Buffer
logger.warn("Spurious RemoteMessage event: already got this UID: "+rm.getUid());
return;
}
//logger.info("Adding Message "+rm.getUid());
messageStore.put(rm.getUid(), rm);
//logger.info("Calling handlers for Message "+rm.getUid());
......
......@@ -230,7 +230,7 @@ public class LocalIU extends AbstractIU
if (isPublished())
{
// send update to remote holders
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("") // TODO: fix this, default in .proto?
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("str") // TODO: fix this, default in .proto?
.build();
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true)
.setWriterName(writer == null ? getOwnerName() : writer).addNewItems(newItem).build();
......@@ -256,7 +256,7 @@ public class LocalIU extends AbstractIU
.setWriterName(writer == null ? getOwnerName() : writer);
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{
PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("") // TODO: fix this, default in .proto?
PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("str") // TODO: fix this, default in .proto?
.build();
builder.addNewItems(newItem);
......
......@@ -55,18 +55,6 @@ public class Payload implements Map<String, String>
this.iu = iu;
}
// def __init__(self, remote_push_iu, new_payload):
// """Create remote payload object.
//
// Keyword arguments:
// remote_push_iu -- remote IU holding this payload
// new_payload -- payload dict to initialise this remote payload with
// """
// super(RemotePushPayload, self).__init__()
// self._remote_push_iu = remote_push_iu
// if new_payload is not None:
// for k,v in new_payload.items():
// dict.__setitem__(self, k, v)
public Payload(AbstractIU iu, List<PayloadItem> payloadItems)
{
this(iu, payloadItems, null);
......@@ -102,21 +90,29 @@ public class Payload implements Map<String, String>
map.clear();
for (PayloadItem item : newPayload)
{
map.put(item.getKey(), item.getValue());
map.put(item.getKey(), pseudoConvertFromJSON(item.getValue(), item.getType()));
}
}
public String pseudoConvertFromJSON(String value, String type) {
if (type.equals("json")) {
System.out.println("Received JSON IU");
if (value.startsWith("\"")) {
return value.replaceAll("\\\"", "");
} else if (value.startsWith("{") || value.startsWith("[") || value.matches("true") || value.matches("false") || value.matches("-?[0-9]*[.,]?[0-9][0-9]*.*")) {
return value;
} else if (value.equals("null")) {
return "";
}
}
return value;
}
// def _remotely_enforced_setitem(self, k, v):
// """Sets an item when requested remotely."""
// return dict.__setitem__(self, k, v)
void enforcedSetItem(String key, String value)
{
map.put(key, value);
}
// def _remotely_enforced_delitem(self, k):
// """Deletes an item when requested remotely."""
// return dict.__delitem__(self, k)
void enforcedRemoveItem(String key)
{
map.remove(key);
......@@ -168,31 +164,6 @@ public class Payload implements Map<String, String>
return map.keySet();
}
// def __setitem__(self, k, v):
// """Set item in this payload.
//
// Requests item setting from the OutputBuffer holding the local version
// of this IU. Returns when permission is granted and item is set;
// otherwise raises an IUUpdateFailedError.
// """
// if self._remote_push_iu.committed:
// raise IUCommittedError(self._remote_push_iu)
// if self._remote_push_iu.read_only:
// raise IUReadOnlyError(self._remote_push_iu)
// requested_update = IUPayloadUpdate(
// uid=self._remote_push_iu.uid,
// revision=self._remote_push_iu.revision,
// is_delta=True,
// writer_name=self._remote_push_iu.buffer.unique_name,
// new_items={k:v},
// keys_to_remove=[])
// remote_server = self._remote_push_iu.buffer._get_remote_server(self._remote_push_iu)
// new_revision = remote_server.updatePayload(requested_update)
// if new_revision == 0:
// raise IUUpdateFailedError(self._remote_push_iu)
// else:
// self._remote_push_iu._revision = new_revision
// dict.__setitem__(self, k, v)
/**
* Set item in this payload.
* Requests item setting from the OutputBuffer holding the local version
......@@ -205,32 +176,6 @@ public class Payload implements Map<String, String>
return map.put(key, value);
}
//
// def __delitem__(self, k):
// """Delete item in this payload.
//
// Requests item deletion from the OutputBuffer holding the local version
// of this IU. Returns when permission is granted and item is deleted;
// otherwise raises an IUUpdateFailedError.
// """
// if self._remote_push_iu.committed:
// raise IUCommittedError(self._remote_push_iu)
// if self._remote_push_iu.read_only:
// raise IUReadOnlyError(self._remote_push_iu)
// requested_update = IUPayloadUpdate(
// uid=self._remote_push_iu.uid,
// revision=self._remote_push_iu.revision,
// is_delta=True,
// writer_name=self._remote_push_iu.buffer.unique_name,
// new_items={},
// keys_to_remove=[k])
// remote_server = self._remote_push_iu.buffer._get_remote_server(self._remote_push_iu)
// new_revision = remote_server.updatePayload(requested_update)
// if new_revision == 0:
// raise IUUpdateFailedError(self._remote_push_iu)
// else:
// self._remote_push_iu._revision = new_revision
// dict.__delitem__(self, k)
/**
* Delete item in this payload.//
* Requests item deletion from the OutputBuffer holding the local version
......
......@@ -71,14 +71,6 @@ public class RemotePushIU extends AbstractIU
return inputBuffer;
}
// def __init__(self, uid, revision, read_only, owner_name, category, type, committed, payload):
// super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only)
// self._revision = revision
// self._category = category
// self.owner_name = owner_name
// self._type = type
// self._committed = committed
// self._payload = RemotePushPayload(remote_push_iu=self, new_payload=payload)
public RemotePushIU(String uid)
{
super(uid);
......@@ -107,7 +99,7 @@ public class RemotePushIU extends AbstractIU
{
throw new IUReadOnlyException(this);
}
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("").build();// TODO use default type in .proto
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("str").build();// TODO use default type in .proto
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision(getRevision())
.setWriterName(getBuffer().getUniqueName()).addNewItems(newItem).build();
......@@ -152,7 +144,7 @@ public class RemotePushIU extends AbstractIU
.setWriterName(getBuffer().getUniqueName());
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{
PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("") // TODO: fix this, default in .proto?
PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("str") // TODO: fix this, default in .proto?
.build();
builder.addNewItems(newItem);
......@@ -187,25 +179,6 @@ public class RemotePushIU extends AbstractIU
setRevision(newRevision);
}
// def commit(self):
// """Commit to this IU."""
// if self.read_only:
// raise IUReadOnlyError(self)
// if self._committed:
// # ignore commit requests when already committed
// return
// else:
// commission_request = iuProtoBuf_pb2.IUCommission()
// commission_request.uid = self.uid
// commission_request.revision = self.revision
// commission_request.writer_name = self.buffer.unique_name
// remote_server = self.buffer._get_remote_server(self)
// new_revision = remote_server.commit(commission_request)
// if new_revision == 0:
// raise IUUpdateFailedError(self)
// else:
// self._revision = new_revision
// self._committed = True
@Override
public void commit(String writerName)
{
......@@ -251,17 +224,6 @@ public class RemotePushIU extends AbstractIU
}
}
// def __str__(self):
// s = "RemotePushIU{ "
// s += "uid="+self._uid+" "
// s += "(buffer="+(self.buffer.unique_name if self.buffer is not None else "<None>")+") "
// s += "owner_name=" + ("<None>" if self.owner_name is None else self.owner_name) + " "
// s += "payload={ "
// for k,v in self.payload.items():
// s += k+":'"+v+"', "
// s += "} "
// s += "}"
// return s
@Override
public String toString()
{
......@@ -280,37 +242,11 @@ public class RemotePushIU extends AbstractIU
return b.toString();
}
//
// def _get_payload(self):
// return self._payload
public Payload getPayload()
{
return payload;
}
// def _set_payload(self, new_pl):
// if self.committed:
// raise IUCommittedError(self)
// if self.read_only:
// raise IUReadOnlyError(self)
// requested_update = IUPayloadUpdate(
// uid=self.uid,
// revision=self.revision,
// is_delta=False,
// writer_name=self.buffer.unique_name,
// new_items=new_pl,
// keys_to_remove=[])
// remote_server = self.buffer._get_remote_server(self)
// new_revision = remote_server.updatePayload(requested_update)
// if new_revision == 0:
// raise IUUpdateFailedError(self)
// else:
// self._revision = new_revision
// self._payload = RemotePushPayload(remote_push_iu=self, new_payload=new_pl)
// payload = property(
// fget=_get_payload,
// fset=_set_payload,
// doc='Payload dictionary of the IU.')
@Override
public void setPayload(List<PayloadItem> newItems, String writerName)
{
......@@ -354,37 +290,33 @@ public class RemotePushIU extends AbstractIU
}
}
// def _apply_update(self, update):
// """Apply a IUPayloadUpdate to the IU."""
// self._revision = update.revision
// if update.is_delta:
// for k in update.keys_to_remove: self.payload._remotely_enforced_delitem(k)
// for k, v in update.new_items.items(): self.payload._remotely_enforced_setitem(k, v)
// else:
// # using '_payload' to circumvent the local writing methods
// self._payload = RemotePushPayload(remote_push_iu=self, new_payload=update.new_items)
/**
* Apply a IUPayloadUpdate to the IU.
* @param update
*/
public void applyUpdate(IUPayloadUpdate update)
{
revision = update.getRevision();
if (update.getIsDelta())
{
for (String key : update.getKeysToRemoveList())
{
payload.enforcedRemoveItem(key);
}
for (PayloadItem item : update.getNewItemsList())
{
payload.enforcedSetItem(item.getKey(), item.getValue());
}
}
else
{
payload = new Payload(this, update.getNewItemsList());
}
public void applyUpdate(IUPayloadUpdate update) {
revision = update.getRevision();
if (update.getIsDelta()) {
for (String key : update.getKeysToRemoveList()) {
payload.enforcedRemoveItem(key);
}
for (PayloadItem item : update.getNewItemsList()) {
if (item.getType().equals("str")) {
payload.enforcedSetItem(item.getKey(), item.getValue());
} else if (item.getType().equals("json")) {
String value = item.getValue();
if (value.startsWith("\"")) {
payload.enforcedSetItem(item.getKey(), value.replaceAll("\\\"", ""));
} else if (value.startsWith("{") || value.startsWith("[") || value.matches("true") || value.matches("false") || value.matches("-?[0-9]*[.,]?[0-9][0-9]*.*")) {
payload.enforcedSetItem(item.getKey(), value);
} else if (value.equals("null")) {
payload.enforcedSetItem(item.getKey(), "");
}
}
}
} else {
payload = new Payload(this, update.getNewItemsList());
}
}
public void applyLinkUpdate(IULinkUpdate update)
......@@ -420,9 +352,6 @@ public class RemotePushIU extends AbstractIU
}
// def _apply_commission(self):
// """Apply commission to the IU"""
// self._committed = True
public void applyCommmision()
{
committed = true;
......@@ -466,25 +395,7 @@ public class RemotePushIU extends AbstractIU
setRevision(newRevision);
}
// def _modify_payload(self, payload, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
// """Modify the payload: add or remove item from this payload remotely and send update."""
// if self.committed:
// raise IUCommittedError(self)
// if self.read_only:
// raise IUReadOnlyError(self)
// requested_update = IUPayloadUpdate(
// uid=self.uid,
// revision=self.revision,
// is_delta=is_delta,
// writer_name=self.buffer.unique_name,
// new_items=new_items,
// keys_to_remove=keys_to_remove)
// remote_server = self.buffer._get_remote_server(self)
// new_revision = remote_server.updatePayload(requested_update)
// if new_revision == 0:
// raise IUUpdateFailedError(self)
// else:
// self._revision = new_revision
@Override
void modifyLinks(boolean isDelta, SetMultimap<String, String> linksToAdd, SetMultimap<String, String> linksToRemove, String writerName)
{
......
......@@ -205,7 +205,7 @@ public class JavaPythonTest
String pypr = PYTHON_PREAMBLE
+"ob = ipaaca.OutputBuffer('pythonside')\n"
+"iu = ipaaca.Message('JavaPythonTest')\n"
+"iu.payload = {'data':'Hello from Python!'}\n"
+"iu.payload['data'] = 'Hello from Python!'\n"
+"time.sleep(0.1)\n"
+"ob.add(iu)\n";
runPythonProgram(pypr);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment