Skip to content
Snippets Groups Projects
Commit 325d40e6 authored by Hendrik Buschmeier's avatar Hendrik Buschmeier
Browse files

ipaacaJava: Added merge method to Payload to enable batch Payload changes.

parent 425db5a3
No related branches found
No related tags found
No related merge requests found
...@@ -209,6 +209,8 @@ public abstract class AbstractIU ...@@ -209,6 +209,8 @@ public abstract class AbstractIU
abstract void setPayload(List<PayloadItem> newItems, String writerName); abstract void setPayload(List<PayloadItem> newItems, String writerName);
abstract void putIntoPayload(String key, String value, String writer); abstract void putIntoPayload(String key, String value, String writer);
abstract void putIntoPayload(Map<? extends String, ? extends String> newItems, String writer);
abstract void removeFromPayload(Object key, String writer); abstract void removeFromPayload(Object key, String writer);
......
...@@ -2,12 +2,14 @@ package ipaaca; ...@@ -2,12 +2,14 @@ package ipaaca;
import ipaaca.protobuf.Ipaaca.IULinkUpdate; import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate; import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate.Builder;
import ipaaca.protobuf.Ipaaca.LinkSet; import ipaaca.protobuf.Ipaaca.LinkSet;
import ipaaca.protobuf.Ipaaca.PayloadItem; import ipaaca.protobuf.Ipaaca.PayloadItem;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
...@@ -204,6 +206,34 @@ public class LocalIU extends AbstractIU ...@@ -204,6 +206,34 @@ public class LocalIU extends AbstractIU
} }
} }
} }
@Override
void putIntoPayload(Map<? extends String, ? extends String> newItems, String writer)
{
synchronized (getRevisionLock())
{
// set item locally
if (isCommitted())
{
throw new IUCommittedException(this);
}
increaseRevisionNumber();
if (isPublished())
{
Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true)
.setWriterName(writer == null ? getOwnerName() : writer);
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{
PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("") // TODO: fix this, default in .proto?
.build();
builder.addNewItems(newItem);
}
IUPayloadUpdate update = builder.build();
getOutputBuffer().sendIUPayloadUpdate(this, update);
}
}
}
@Override @Override
public void commit() public void commit()
......
...@@ -153,14 +153,20 @@ public class OutputBuffer extends Buffer ...@@ -153,14 +153,20 @@ public class OutputBuffer extends Buffer
{ {
iu.getPayload().remove(k, update.getWriterName()); iu.getPayload().remove(k, update.getWriterName());
} }
for (PayloadItem pli : update.getNewItemsList()) if (update.getNewItemsList().size() > 0)
{ {
iu.getPayload().put(pli.getKey(), pli.getValue(), update.getWriterName()); HashMap<String, String> payloadUpdate = new HashMap<String, String>();
for (PayloadItem pli : update.getNewItemsList())
{
payloadUpdate.put(pli.getKey(), pli.getValue());
// //iu.getPayload().put(pli.getKey(), pli.getValue(), update.getWriterName());
}
iu.getPayload().putAll(payloadUpdate, update.getWriterName());
} }
} }
else else
{ {
iu.setPayload(update.getNewItemsList(), update.getWriterName()); iu.setPayload(update.getNewItemsList(), update.getWriterName());
} }
callIuEventHandlers(update.getUid(), true, IUEventType.UPDATED, iu.getCategory()); callIuEventHandlers(update.getUid(), true, IUEventType.UPDATED, iu.getCategory());
......
...@@ -216,9 +216,20 @@ public class Payload implements Map<String, String> ...@@ -216,9 +216,20 @@ public class Payload implements Map<String, String>
return put(key, value, null); return put(key, value, null);
} }
public void putAll(Map<? extends String, ? extends String> m)
public void putAll(Map<? extends String, ? extends String> newItems)
{ {
throw new RuntimeException("Not implemented"); putAll(newItems);
}
public void putAll(Map<? extends String, ? extends String> newItems, String writer)
{
iu.putIntoPayload(newItems, writer);
map.putAll(newItems);
}
public void merge(Map<? extends String, ? extends String> items) {
putAll(items, null);
} }
public String remove(Object key) public String remove(Object key)
......
...@@ -3,6 +3,7 @@ package ipaaca; ...@@ -3,6 +3,7 @@ package ipaaca;
import ipaaca.protobuf.Ipaaca.PayloadItem; import ipaaca.protobuf.Ipaaca.PayloadItem;
import java.util.List; import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -48,6 +49,15 @@ public class RemoteMessageIU extends AbstractIU ...@@ -48,6 +49,15 @@ public class RemoteMessageIU extends AbstractIU
payload.put(key,value); payload.put(key,value);
log.info("Info: modifying a RemoteMessage only has local effects"); log.info("Info: modifying a RemoteMessage only has local effects");
} }
void putIntoPayload(Map<? extends String, ? extends String> newItems, String writer) {
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{
payload.put(item.getKey(), item.getValue());
//System.out.println(entry.getKey() + "/" + entry.getValue());
}
log.info("Info: modifying a RemoteMessage only has local effects");
}
@Override @Override
void removeFromPayload(Object key, String writer) void removeFromPayload(Object key, String writer)
......
...@@ -6,10 +6,12 @@ import ipaaca.protobuf.Ipaaca.IULinkUpdate; ...@@ -6,10 +6,12 @@ import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate; import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
import ipaaca.protobuf.Ipaaca.LinkSet; import ipaaca.protobuf.Ipaaca.LinkSet;
import ipaaca.protobuf.Ipaaca.PayloadItem; import ipaaca.protobuf.Ipaaca.PayloadItem;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate.Builder;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
...@@ -93,6 +95,47 @@ public class RemotePushIU extends AbstractIU ...@@ -93,6 +95,47 @@ public class RemotePushIU extends AbstractIU
setRevision(newRevision); setRevision(newRevision);
} }
@Override
void putIntoPayload(Map<? extends String, ? extends String> newItems, String writer)
{
if (isCommitted())
{
throw new IUCommittedException(this);
}
if (isReadOnly())
{
throw new IUReadOnlyException(this);
}
Builder builder = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true)
.setWriterName(getBuffer().getUniqueName());
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{
PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("") // TODO: fix this, default in .proto?
.build();
builder.addNewItems(newItem);
}
IUPayloadUpdate update = builder.build();
RemoteServer server = getInputBuffer().getRemoteServer(this);
logger.debug("Remote server has methods {}", server.getMethods());
int newRevision;
try
{
newRevision = (Integer) server.call("updatePayload", update);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
if (newRevision == 0)
{
throw new IUUpdateFailedException(this);
}
System.err.print("************************ "); System.err.println(newRevision);
setRevision(newRevision);
}
// def commit(self): // def commit(self):
// """Commit to this IU.""" // """Commit to this IU."""
// if self.read_only: // if self.read_only:
......
...@@ -31,6 +31,8 @@ public class TextPrinter ...@@ -31,6 +31,8 @@ public class TextPrinter
case COMMITTED: System.out.println("IU committed"); break; case COMMITTED: System.out.println("IU committed"); break;
case UPDATED: System.out.println("IU updated "+iu.getPayload()); break; case UPDATED: System.out.println("IU updated "+iu.getPayload()); break;
case LINKSUPDATED: System.out.println("IU links updated"); break; case LINKSUPDATED: System.out.println("IU links updated"); break;
case RETRACTED: break;
case DELETED: break;
} }
} }
......
...@@ -10,6 +10,7 @@ import static org.junit.Assert.assertThat; ...@@ -10,6 +10,7 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap;
import java.util.Set; import java.util.Set;
import org.junit.After; import org.junit.After;
...@@ -170,6 +171,42 @@ public class ComponentPushCommunicationIntegrationTest ...@@ -170,6 +171,42 @@ public class ComponentPushCommunicationIntegrationTest
assertEquals(1,component1EventHandler.getNumberOfUpdateEvents(localIU.getUid())); assertEquals(1,component1EventHandler.getNumberOfUpdateEvents(localIU.getUid()));
} }
@Test
public void testSetAllPayload() throws InterruptedException
{
outBuffer.add(localIU);
Thread.sleep(200);
AbstractIU iuIn = inBuffer.getIU(localIU.getUid());
HashMap<String, String> payloadUpdate = new HashMap<String, String>();
payloadUpdate.put("chunk11", "item1");
payloadUpdate.put("chunk12", "item2");
payloadUpdate.put("chunk13", "item3");
payloadUpdate.put("chunk14", "item4");
int oldRev = iuIn.getRevision();
localIU.getPayload().merge(payloadUpdate);
Thread.sleep(200);
assertEquals(oldRev + 1, iuIn.getRevision());
assertTrue(iuIn.getPayload().containsKey("chunk11"));
assertTrue(iuIn.getPayload().containsKey("chunk12"));
assertTrue(iuIn.getPayload().containsKey("chunk13"));
assertTrue(iuIn.getPayload().containsKey("chunk14"));
HashMap<String, String> payloadUpdate2 = new HashMap<String, String>();
payloadUpdate2.put("chunk21", "item5");
payloadUpdate2.put("chunk22", "item6");
payloadUpdate2.put("chunk13", "item3-changed");
payloadUpdate2.put("chunk14", "item4-changed");
int oldRev2 = iuIn.getRevision();
iuIn.getPayload().merge(payloadUpdate2);
Thread.sleep(200);
assertEquals(oldRev2 + 1, localIU.getRevision());
assertTrue(localIU.getPayload().containsKey("chunk21"));
assertTrue(localIU.getPayload().containsKey("chunk22"));
assertEquals("item3-changed", localIU.getPayload().get("chunk13"));
assertEquals("item4-changed", localIU.getPayload().get("chunk14"));
}
@Test @Test
public void testIUUpdateFromInputBuffer() throws InterruptedException public void testIUUpdateFromInputBuffer() throws InterruptedException
{ {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment