Skip to content
Snippets Groups Projects
Commit a58e702c authored by Herwin van Welbergen's avatar Herwin van Welbergen
Browse files

added message iu

parent 3a2afb11
No related branches found
No related tags found
No related merge requests found
Showing
with 495 additions and 104 deletions
......@@ -4,6 +4,8 @@
*/build
*/*/build
*/*/*/build
*/*/*/docs
*/*/docs
*/lib
*/test/lib
*/test/report
......
<ivy-module version="2.0">
<info organisation="ipaaca" module="ipaaca-java"/>
<configurations>
<include file="${ivy.settings.dir}/configurations.xml"/>
</configurations>
<dependencies>
<dependency org="slf4j" name="slf4j-api" rev="latest.release" />
<dependency org="google" name="guava" rev="latest.release" />
<dependency org="google" name="protobuf-java" rev="latest.release" />
<dependency org="rsb" name="rsb" rev="latest.release" />
</dependencies>
</ivy-module>
<ivy-module version="2.0">
<info organisation="ipaaca" module="ipaaca-java"/>
<configurations>
<include file="${ivy.settings.dir}/configurations.xml"/>
</configurations>
<dependencies>
<dependency org="slf4j" name="slf4j-api" rev="latest.release" />
<dependency org="google" name="guava" rev="latest.release" />
<dependency org="google" name="protobuf-java" rev="latest.release" />
<dependency org="rsb" name="rsb" rev="latest.release" />
<dependency org="lombok" name="lombok" rev="latest.release" />
</dependencies>
</ivy-module>
......@@ -56,12 +56,18 @@ public class IUConverter implements Converter<ByteBuffer>
links.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build());
}
IU.AccessMode accessMode = IU.AccessMode.PUSH;
if(iua instanceof RemoteMessageIU || iua instanceof LocalMessageIU)
{
accessMode = IU.AccessMode.MESSAGE;
}
IU iu = IU.newBuilder().setUid(iua.getUid()).setRevision(iua.getRevision()).setCategory(iua.getCategory())
.setOwnerName(iua.getOwnerName()).setCommitted(iua.isCommitted()).setAccessMode(IU.AccessMode.PUSH) // TODO for other access modes (also in Python version)
.setOwnerName(iua.getOwnerName()).setCommitted(iua.isCommitted()).setAccessMode(accessMode)
.setReadOnly(iua.isReadOnly()).setPayloadType("MAP").addAllPayload(payloadItems).addAllLinks(links).build();
return new WireContents<ByteBuffer>(ByteBuffer.wrap(iu.toByteArray()), "ipaaca-iu");
}
@Override
public UserData<?> deserialize(String wireSchema, ByteBuffer buffer) throws ConversionException
{
......@@ -78,24 +84,36 @@ public class IUConverter implements Converter<ByteBuffer>
if (iu.getAccessMode() == IU.AccessMode.PUSH)
{
RemotePushIU iuout = new RemotePushIU(iu.getUid());
iuout.setCategory(iu.getCategory());
iuout.committed = iu.getCommitted();
iuout.setOwnerName(iu.getOwnerName());
iuout.setRevision(iu.getRevision());
iuout.setReadOnly(iu.getReadOnly());
iuout.payload = new Payload(iuout, iu.getPayloadList());
SetMultimap<String, String> links = HashMultimap.create();
for (LinkSet ls : iu.getLinksList())
{
links.putAll(ls.getType(), ls.getTargetsList());
}
iuout.setLinksLocally(links);
copyIU(iu, iuout);
return new UserData<RemotePushIU>(iuout, RemotePushIU.class);
}
else if(iu.getAccessMode() == IU.AccessMode.MESSAGE)
{
RemoteMessageIU iuout = new RemoteMessageIU(iu.getUid());
copyIU(iu,iuout);
return new UserData<RemoteMessageIU>(iuout, RemoteMessageIU.class);
}
else
{
throw new RuntimeException("We can only handle IUs with access mode 'PUSH' for now!");
throw new RuntimeException("Trying to deserialize IU with accesmode: "+iu.getAccessMode()+". " +
"We can only handle IUs with access mode 'PUSH' or 'MESSAGE' for now!");
}
}
private void copyIU(IU iu, AbstractIU iuout)
{
iuout.setCategory(iu.getCategory());
iuout.committed = iu.getCommitted();
iuout.setOwnerName(iu.getOwnerName());
iuout.setRevision(iu.getRevision());
iuout.setReadOnly(iu.getReadOnly());
iuout.payload = new Payload(iuout, iu.getPayloadList());
SetMultimap<String, String> links = HashMultimap.create();
for (LinkSet ls : iu.getLinksList())
{
links.putAll(ls.getType(), ls.getTargetsList());
}
iuout.setLinksLocally(links);
}
}
......@@ -25,7 +25,12 @@ public final class Initializer
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new IUConverter(new ConverterSignature("ipaaca-iu", RemotePushIU.class)));
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new IUConverter(new ConverterSignature("ipaaca-localiu", LocalIU.class)));
new IUConverter(new ConverterSignature("ipaaca-localiu", LocalIU.class)));
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new IUConverter(new ConverterSignature("ipaaca-messageiu", RemoteMessageIU.class)));
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new IUConverter(new ConverterSignature("ipaaca-localmessageiu", LocalMessageIU.class)));
DefaultConverterRepository.getDefaultConverterRepository().addConverter(new PayloadConverter());
DefaultConverterRepository.getDefaultConverterRepository().addConverter(new LinkUpdateConverter());
......
......@@ -32,6 +32,7 @@ public class InputBuffer extends Buffer
private Set<String> categoryInterests = new HashSet<String>();
private final static Logger logger = LoggerFactory.getLogger(InputBuffer.class.getName());
private IUStore<RemotePushIU> iuStore = new IUStore<RemotePushIU>();
private IUStore<RemoteMessageIU> messageStore = new IUStore<RemoteMessageIU>();
public void close()
{
......@@ -184,7 +185,14 @@ public class InputBuffer extends Buffer
*/
private void handleIUEvents(Event event)
{
if (event.getData() instanceof RemotePushIU)
if(event.getData() instanceof RemoteMessageIU)
{
RemoteMessageIU rm = (RemoteMessageIU) event.getData();
messageStore.put(rm.getUid(), rm);
callIuEventHandlers(rm.getUid(),false, IUEventType.ADDED, rm.getCategory());
messageStore.remove(rm.getUid());
}
else if (event.getData() instanceof RemotePushIU)
{
RemotePushIU rp = (RemotePushIU) event.getData();
// a new IU
......@@ -269,7 +277,14 @@ public class InputBuffer extends Buffer
@Override
public AbstractIU getIU(String iuid)
{
return iuStore.get(iuid);
if(iuStore.get(iuid)!=null)
{
return iuStore.get(iuid);
}
else
{
return messageStore.get(iuid);
}
}
public Collection<RemotePushIU> getIUs()
......
package ipaaca;
/**
* Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls.
* @author hvanwelbergen
*/
public class LocalMessageIU extends LocalIU
{
}
package ipaaca;
import ipaaca.protobuf.Ipaaca.PayloadItem;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import com.google.common.collect.SetMultimap;
@Slf4j
public class RemoteMessageIU extends AbstractIU
{
public RemoteMessageIU(String uid)
{
super(uid);
payload = new Payload(this);
}
@Override
public void commit()
{
log.info("Info: committing to a RemoteMessage only has local effects");
committed = true;
}
@Override
public void commit(String writerName)
{
log.info("Info: committing to a RemoteMessage only has local effects");
committed = true;
}
@Override
void setPayload(List<PayloadItem> newItems, String writerName)
{
for(PayloadItem item:newItems)
{
payload.put(item.getKey(),item.getValue());
}
log.info("Info: modifying a RemoteMessage only has local effects");
}
@Override
void putIntoPayload(String key, String value, String writer)
{
payload.put(key,value);
log.info("Info: modifying a RemoteMessage only has local effects");
}
@Override
void removeFromPayload(Object key, String writer)
{
payload.remove(key);
log.info("Info: modifying a RemoteMessage only has local effects");
}
@Override
void handlePayloadSetting(List<PayloadItem> newPayload, String writerName)
{
}
@Override
void modifyLinks(boolean isDelta, SetMultimap<String, String> linksToAdd, SetMultimap<String, String> linksToRemove, String Writer)
{
log.info("Info: modifying a RemoteMessage only has local effects");
}
}
package ipaaca;
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.util.EnumSet;
import java.util.Set;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.ImmutableSet;
/**
* Test communication of the 'MESSAGE' type between IUs
* @author hvanwelbergen
*
*/
public class ComponentMessageCommunicationIntegrationTest
{
@BeforeClass
public static void setupStatic()
{
Initializer.initializeIpaacaRsb();
}
private OutputBuffer outBuffer;
private InputBuffer inBuffer;
private LocalMessageIU localIU;
private CountingEventHandler component1EventHandler;
private CountingEventHandler component2EventHandler;
private StoringEventHandler component1StoreHandler = new StoringEventHandler();
private StoringEventHandler component2StoreHandler = new StoringEventHandler();
private static final String CATEGORY = "category1";
@Before
public void setup()
{
outBuffer = new OutputBuffer("component1");
Set<String> categories = new ImmutableSet.Builder<String>().add(CATEGORY).build();
inBuffer = new InputBuffer("component2", categories);
EnumSet<IUEventType> types = EnumSet.of(IUEventType.ADDED,IUEventType.COMMITTED,IUEventType.UPDATED);
component2EventHandler = new CountingEventHandler();
component1EventHandler = new CountingEventHandler();
inBuffer.registerHandler(new IUEventHandler(component2EventHandler,types,categories));
outBuffer.registerHandler(new IUEventHandler(component1EventHandler,types,categories));
inBuffer.registerHandler(new IUEventHandler(component2StoreHandler,types,categories));
outBuffer.registerHandler(new IUEventHandler(component1StoreHandler,types,categories));
localIU = new LocalMessageIU();
localIU.setCategory(CATEGORY);
localIU.getPayload().put("key1", "item1");
localIU.addLinks("INIT", ImmutableSet.of("init1","init2"));
}
@After
public void tearDown()
{
inBuffer.close();
outBuffer.close();
}
@Test
public void testAddedIU() throws InterruptedException
{
outBuffer.add(localIU);
Thread.sleep(200);
AbstractIU iuIn = inBuffer.getIU(localIU.getUid());
assertNull(iuIn);
assertThat(localIU.getLinks("INIT"),containsInAnyOrder("init1","init2"));
assertEquals(1,component2EventHandler.getNumberOfAddEvents(localIU.getUid()));
assertEquals(0,component1EventHandler.getNumberOfAddEvents(localIU.getUid()));
assertEquals(1,component2EventHandler.getNumberOfAddEvents(localIU.getUid()));
assertEquals(0,component1EventHandler.getNumberOfAddEvents(localIU.getUid()));
assertEquals(localIU.getUid(), component2StoreHandler.getAddedIUs().get(0).getUid());
}
@Test
public void testIUCommit() throws InterruptedException
{
outBuffer.add(localIU);
localIU.commit();
Thread.sleep(200);
assertEquals(0,component1EventHandler.getNumberOfCommitEvents(localIU.getUid()));
assertEquals(0,component2EventHandler.getNumberOfCommitEvents(localIU.getUid()));
assertFalse(component2StoreHandler.getAddedIUs().get(0).isCommitted());
}
@Test
public void testIUCommitBeforePublish() throws InterruptedException
{
localIU.commit();
outBuffer.add(localIU);
Thread.sleep(200);
assertEquals(0,component1EventHandler.getNumberOfCommitEvents(localIU.getUid()));
assertEquals(0,component2EventHandler.getNumberOfCommitEvents(localIU.getUid()));
assertTrue(component2StoreHandler.getAddedIUs().get(0).isCommitted());
}
@Test
public void testIUCommitFromInputBuffer() throws InterruptedException
{
outBuffer.add(localIU);
Thread.sleep(200);
AbstractIU iuIn = component2StoreHandler.getAddedIUs().get(0);
iuIn.commit();
Thread.sleep(200);
assertFalse(localIU.isCommitted());
assertEquals(0,component1EventHandler.getNumberOfCommitEvents(localIU.getUid()));
assertEquals(0,component2EventHandler.getNumberOfCommitEvents(localIU.getUid()));
}
@Test
public void testIUUpdate() throws InterruptedException
{
outBuffer.add(localIU);
Thread.sleep(200);
AbstractIU iuIn = component2StoreHandler.getAddedIUs().get(0);
assertNull(iuIn.getPayload().get("key2"));
localIU.getPayload().put("key2", "value2");
Thread.sleep(200);
assertEquals(null, iuIn.getPayload().get("key2"));
assertEquals(0,component2EventHandler.getNumberOfUpdateEvents(localIU.getUid()));
assertEquals(0,component1EventHandler.getNumberOfUpdateEvents(localIU.getUid()));
}
@Test
public void testIUUpdateBeforePublish() throws InterruptedException
{
localIU.getPayload().put("key2", "value2");
outBuffer.add(localIU);
Thread.sleep(200);
AbstractIU iuIn = component2StoreHandler.getAddedIUs().get(0);
assertEquals("value2", iuIn.getPayload().get("key2"));
}
}
......@@ -3,8 +3,6 @@ package ipaaca;
import static org.junit.Assert.*;
import java.util.EnumSet;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.*;
......@@ -24,7 +22,7 @@ import static ipaaca.IUTestUtil.*;
* @author hvanwelbergen
*
*/
public class ComponentCommunicationIntegrationTest
public class ComponentPushCommunicationIntegrationTest
{
@BeforeClass
public static void setupStatic()
......@@ -35,66 +33,10 @@ public class ComponentCommunicationIntegrationTest
private OutputBuffer outBuffer;
private InputBuffer inBuffer;
private LocalIU localIU;
private MyEventHandler component1EventHandler;
private MyEventHandler component2EventHandler;
private CountingEventHandler component1EventHandler;
private CountingEventHandler component2EventHandler;
private static final String CATEGORY = "category1";
private static final class MyEventHandler implements HandlerFunctor
{
private Map<String,Integer> commitEvents = new HashMap<String,Integer>();
private Map<String,Integer> addEvents = new HashMap<String,Integer>();
private Map<String,Integer> updateEvents = new HashMap<String,Integer>();
private void updateEventMap(String key, Map<String,Integer> map)
{
int value = 0;
if(map.containsKey(key))
{
value = map.get(key);
}
value++;
map.put(key, value);
}
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
switch(type)
{
case ADDED: updateEventMap(iu.getUid(),addEvents); break;
case COMMITTED: updateEventMap(iu.getUid(),commitEvents); break;
case UPDATED: updateEventMap(iu.getUid(),updateEvents); break;
}
}
public int getNumberOfCommitEvents(String iu)
{
if(!commitEvents.containsKey(iu))
{
return 0;
}
return commitEvents.get(iu);
}
public int getNumberOfAddEvents(String iu)
{
if(!addEvents.containsKey(iu))
{
return 0;
}
return addEvents.get(iu);
}
public int getNumberOfUpdateEvents(String iu)
{
if(!updateEvents.containsKey(iu))
{
return 0;
}
return updateEvents.get(iu);
}
}
@Before
public void setup()
{
......@@ -103,8 +45,8 @@ public class ComponentCommunicationIntegrationTest
Set<String> categories = new ImmutableSet.Builder<String>().add(CATEGORY).build();
inBuffer = new InputBuffer("component2", categories);
EnumSet<IUEventType> types = EnumSet.of(IUEventType.ADDED,IUEventType.COMMITTED,IUEventType.UPDATED);
component2EventHandler = new MyEventHandler();
component1EventHandler = new MyEventHandler();
component2EventHandler = new CountingEventHandler();
component1EventHandler = new CountingEventHandler();
inBuffer.registerHandler(new IUEventHandler(component2EventHandler,types,categories));
outBuffer.registerHandler(new IUEventHandler(component1EventHandler,types,categories));
......
package ipaaca;
import java.util.HashMap;
import java.util.Map;
/**
* Counts how often and in what order certain events occur
* @author hvanwelbergen
*
*/
final class CountingEventHandler implements HandlerFunctor
{
private Map<String,Integer> commitEvents = new HashMap<String,Integer>();
private Map<String,Integer> addEvents = new HashMap<String,Integer>();
private Map<String,Integer> updateEvents = new HashMap<String,Integer>();
private void updateEventMap(String key, Map<String,Integer> map)
{
int value = 0;
if(map.containsKey(key))
{
value = map.get(key);
}
value++;
map.put(key, value);
}
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
switch(type)
{
case ADDED: updateEventMap(iu.getUid(),addEvents); break;
case COMMITTED: updateEventMap(iu.getUid(),commitEvents); break;
case UPDATED: updateEventMap(iu.getUid(),updateEvents); break;
case DELETED:
break;
case LINKSUPDATED:
break;
case RETRACTED:
break;
default:
break;
}
}
public int getNumberOfCommitEvents(String iu)
{
if(!commitEvents.containsKey(iu))
{
return 0;
}
return commitEvents.get(iu);
}
public int getNumberOfAddEvents(String iu)
{
if(!addEvents.containsKey(iu))
{
return 0;
}
return addEvents.get(iu);
}
public int getNumberOfUpdateEvents(String iu)
{
if(!updateEvents.containsKey(iu))
{
return 0;
}
return updateEvents.get(iu);
}
}
\ No newline at end of file
package ipaaca;
import static ipaaca.IUTestUtil.assertEqualIU;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import ipaaca.protobuf.Ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.IU.AccessMode;
......@@ -10,22 +17,17 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.*;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.InvalidProtocolBufferException;
import rsb.converter.ConversionException;
import rsb.converter.ConverterSignature;
import rsb.converter.UserData;
import rsb.converter.WireContents;
import rsb.patterns.RemoteServer;
import static ipaaca.IUTestUtil.*;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* Unit test cases for the IUConverter
......@@ -50,7 +52,7 @@ public class IuConverterTest
@Test
public void testSerialize() throws ConversionException, InvalidProtocolBufferException
public void testSerializePushIU() throws ConversionException, InvalidProtocolBufferException
{
RemotePushIU rpIU = new RemotePushIU("iu1");
rpIU.setRevision(1);
......@@ -65,7 +67,28 @@ public class IuConverterTest
WireContents<ByteBuffer> wiu = converter.serialize(RemotePushIU.class,rpIU);
IU iu = IU.newBuilder().mergeFrom(wiu.getSerialization().array()).build();
assertEqualIU(iu, rpIU);
assertEqualIU(iu, rpIU);
assertEquals(IU.AccessMode.PUSH,iu.getAccessMode());
}
@Test
public void testSerializeMessageIU() throws ConversionException, InvalidProtocolBufferException
{
RemoteMessageIU rmIU = new RemoteMessageIU("iu1");
rmIU.setRevision(1);
rmIU.setOwnerName("owner");
rmIU.setCategory(CATEGORY);
rmIU.setBuffer(mockInputBuffer);
rmIU.getPayload().enforcedSetItem("key1", "value1");
rmIU.getPayload().enforcedSetItem("key2", "value2");
rmIU.getPayload().enforcedSetItem("key3", "value3");
rmIU.setLinksLocally("SAME_LEVEL",ImmutableSet.of("sibling1","sibling2"));
rmIU.setLinksLocally("GROUNDED_IN",ImmutableSet.of("parent1","parent2"));
WireContents<ByteBuffer> wiu = converter.serialize(RemoteMessageIU.class,rmIU);
IU iu = IU.newBuilder().mergeFrom(wiu.getSerialization().array()).build();
assertEqualIU(iu, rmIU);
assertEquals(IU.AccessMode.MESSAGE,iu.getAccessMode());
}
public PayloadItem createPayloadItem(String key, String value)
......@@ -77,8 +100,9 @@ public class IuConverterTest
.build();
}
@Test
public void testDeSerialize() throws ConversionException
public void testDeSerializePushIU() throws ConversionException
{
List<PayloadItem> payload = new ArrayList<PayloadItem>();
payload.add(createPayloadItem("key1","value1"));
......@@ -114,6 +138,47 @@ public class IuConverterTest
assertThat(data.getData(), instanceOf(RemotePushIU.class));
RemotePushIU rpIU = (RemotePushIU) data.getData();
assertEqualIU(iu, rpIU);
assertEqualIU(iu, rpIU);
}
@Test
public void testDeSerializeMessageIU() throws ConversionException
{
List<PayloadItem> payload = new ArrayList<PayloadItem>();
payload.add(createPayloadItem("key1","value1"));
payload.add(createPayloadItem("key2","value2"));
payload.add(createPayloadItem("key3","value3"));
List<LinkSet> links = new ArrayList<LinkSet>();
links.add(
LinkSet.newBuilder()
.addAllTargets(ImmutableSet.of("sibling1","sibling2"))
.setType("SAME_LEVEL")
.build()
);
links.add(
LinkSet.newBuilder()
.addAllTargets(ImmutableSet.of("parent1","parent2"))
.setType("GROUNDED_IN")
.build()
);
Ipaaca.IU iu = Ipaaca.IU.newBuilder()
.setUid("uid1")
.setRevision(1)
.setCommitted(false)
.setOwnerName("owner")
.setAccessMode(AccessMode.MESSAGE)
.setReadOnly(false)
.setCategory(CATEGORY)
.addAllPayload(payload)
.addAllLinks(links)
.setPayloadType("")
.build();
UserData<?> data = converter.deserialize("", ByteBuffer.wrap(iu.toByteArray()));
assertThat(data.getData(), instanceOf(RemoteMessageIU.class));
RemoteMessageIU rpIU = (RemoteMessageIU) data.getData();
assertEqualIU(iu, rpIU);
}
}
package ipaaca;
import java.util.ArrayList;
import java.util.List;
/**
* Stores ius for which add messages occured.
* @author hvanwelbergen
*
*/
public class StoringEventHandler implements HandlerFunctor
{
private List<AbstractIU> addedIUs = new ArrayList<>();
public List<AbstractIU> getAddedIUs()
{
return addedIUs;
}
@Override
public void handle(AbstractIU iu, IUEventType type, boolean local)
{
switch (type)
{
case ADDED:
addedIUs.add(iu);
break;
case COMMITTED:
break;
case UPDATED:
break;
case DELETED:
break;
case LINKSUPDATED:
break;
case RETRACTED:
break;
default:
break;
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment