Skip to content
Snippets Groups Projects
Commit 25c2d47e authored by Hendrik Buschmeier's avatar Hendrik Buschmeier
Browse files
parents 6b753ba4 1c1f7575
No related branches found
No related tags found
No related merge requests found
...@@ -184,11 +184,6 @@ public abstract class AbstractIU ...@@ -184,11 +184,6 @@ public abstract class AbstractIU
return uid; return uid;
} }
public void setUid(String uid)
{
this.uid = uid;
}
public abstract void commit(); public abstract void commit();
// XXX: might not be valid for all types of IUs // XXX: might not be valid for all types of IUs
......
...@@ -5,6 +5,7 @@ import ipaaca.protobuf.Ipaaca.IUPayloadUpdate; ...@@ -5,6 +5,7 @@ import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;
import ipaaca.protobuf.Ipaaca.LinkSet; import ipaaca.protobuf.Ipaaca.LinkSet;
import ipaaca.protobuf.Ipaaca.PayloadItem; import ipaaca.protobuf.Ipaaca.PayloadItem;
import java.rmi.server.UID;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
...@@ -32,12 +33,7 @@ public class LocalIU extends AbstractIU ...@@ -32,12 +33,7 @@ public class LocalIU extends AbstractIU
public LocalIU() public LocalIU()
{ {
this(null); super(new UID().toString());
}
public LocalIU(String uid)
{
super(uid);
revision = 1; revision = 1;
payload = new Payload(this); payload = new Payload(this);
} }
......
...@@ -27,150 +27,147 @@ import com.google.common.collect.SetMultimap; ...@@ -27,150 +27,147 @@ import com.google.common.collect.SetMultimap;
* An OutputBuffer that holds local IUs. * An OutputBuffer that holds local IUs.
* @author hvanwelbergen * @author hvanwelbergen
*/ */
public class OutputBuffer extends Buffer{ public class OutputBuffer extends Buffer
{
private final LocalServer server;
private Map<String,Informer<Object>> informerStore = new HashMap<String,Informer<Object>>(); //category -> informer map private final LocalServer server;
private final String idPrefix; private Map<String, Informer<Object>> informerStore = new HashMap<String, Informer<Object>>(); // category -> informer map
private int iuIdCounter = 0; private final static Logger logger = LoggerFactory.getLogger(OutputBuffer.class.getName());
private final Object iuIdCounterLock = new Object(); private IUStore<LocalIU> iuStore = new IUStore<LocalIU>();
private final static Logger logger = LoggerFactory.getLogger(OutputBuffer.class.getName());
private IUStore<LocalIU> iuStore = new IUStore<LocalIU>(); // def __init__(self, owning_component_name, participant_config=None):
// '''Create an Output Buffer.
// def __init__(self, owning_component_name, participant_config=None): //
// '''Create an Output Buffer. // Keyword arguments:
// // owning_component_name -- name of the entity that own this buffer
// Keyword arguments: // participant_config -- RSB configuration
// owning_component_name -- name of the entity that own this buffer // '''
// participant_config -- RSB configuration // super(OutputBuffer, self).__init__(owning_component_name, participant_config)
// ''' // self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB'
// super(OutputBuffer, self).__init__(owning_component_name, participant_config) // self._server = rsb.createServer(rsb.Scope(self._unique_name))
// self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB' // self._server.addMethod('updatePayload', self._remote_update_payload, IUPayloadUpdate, int)
// self._server = rsb.createServer(rsb.Scope(self._unique_name)) // self._server.addMethod('commit', self._remote_commit, iuProtoBuf_pb2.IUCommission, int)
// self._server.addMethod('updatePayload', self._remote_update_payload, IUPayloadUpdate, int) // self._informer_store = {}
// self._server.addMethod('commit', self._remote_commit, iuProtoBuf_pb2.IUCommission, int) // self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-'
// self._informer_store = {} // self.__iu_id_counter_lock = threading.Lock()
// self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-' // self.__iu_id_counter = 0
// self.__iu_id_counter_lock = threading.Lock() /**
// self.__iu_id_counter = 0 * @param owningComponentName name of the entity that own this buffer
/** */
* @param owningComponentName name of the entity that own this buffer public OutputBuffer(String owningComponentName)
*/ {
public OutputBuffer(String owningComponentName) { super(owningComponentName);
super(owningComponentName);
uniqueName = "/ipaaca/component/" + owningComponentName + "ID" + uuid + "/OB";
uniqueName = "/ipaaca/component/" + owningComponentName + "ID" + uuid + "/OB"; logger.debug("Creating server for {}", uniqueName);
logger.debug("Creating server for {}",uniqueName); server = Factory.getInstance().createLocalServer(uniqueName);
server = Factory.getInstance().createLocalServer(uniqueName); try
try { {
server.addMethod("updatePayload", new RemoteUpdatePayload()); server.addMethod("updatePayload", new RemoteUpdatePayload());
server.addMethod("updateLinks", new RemoteUpdateLinks()); server.addMethod("updateLinks", new RemoteUpdateLinks());
server.addMethod("commit", new RemoteCommit()); server.addMethod("commit", new RemoteCommit());
server.activate(); server.activate();
} catch (InitializeException e) { }
throw new RuntimeException(e); catch (InitializeException e)
} {
throw new RuntimeException(e);
idPrefix = owningComponentName+uuid.toString()+"-IU-"; }
}
}
private final class RemoteUpdatePayload implements DataCallback<Integer,IUPayloadUpdate>
{ private final class RemoteUpdatePayload implements DataCallback<Integer, IUPayloadUpdate>
{
@Override @Override
public Integer invoke(IUPayloadUpdate data) throws Throwable public Integer invoke(IUPayloadUpdate data) throws Throwable
{ {
logger.debug("remoteUpdate"); logger.debug("remoteUpdate");
return remoteUpdatePayload(data); return remoteUpdatePayload(data);
} }
} }
private final class RemoteUpdateLinks implements DataCallback<Integer,IULinkUpdate> private final class RemoteUpdateLinks implements DataCallback<Integer, IULinkUpdate>
{ {
@Override @Override
public Integer invoke(IULinkUpdate data) throws Throwable public Integer invoke(IULinkUpdate data) throws Throwable
{ {
logger.debug("remoteUpdateLinks"); logger.debug("remoteUpdateLinks");
return remoteUpdateLinks(data); return remoteUpdateLinks(data);
} }
} }
private final class RemoteCommit implements DataCallback<Integer,IUCommission> private final class RemoteCommit implements DataCallback<Integer, IUCommission>
{ {
@Override @Override
public Integer invoke(IUCommission data) throws Throwable public Integer invoke(IUCommission data) throws Throwable
{ {
logger.debug("remoteCommit"); logger.debug("remoteCommit");
return remoteCommit(data); return remoteCommit(data);
} }
} }
// def _remote_update_payload(self, update):
// def _remote_update_payload(self, update): // '''Apply a remotely requested update to one of the stored IUs.'''
// '''Apply a remotely requested update to one of the stored IUs.''' // if update.uid not in self._iu_store:
// if update.uid not in self._iu_store: // logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
// logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid)) // return 0
// return 0 // iu = self._iu_store[update.uid]
// iu = self._iu_store[update.uid] // if (update.revision != 0) and (update.revision != iu.revision):
// if (update.revision != 0) and (update.revision != iu.revision): // # (0 means "do not pay attention to the revision number" -> "force update")
// # (0 means "do not pay attention to the revision number" -> "force update") // logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
// logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid)) // return 0
// return 0 // if update.is_delta:
// if update.is_delta: // for k in update.keys_to_remove:
// for k in update.keys_to_remove: // iu.payload.__delitem__(k, writer_name=update.writer_name)
// iu.payload.__delitem__(k, writer_name=update.writer_name) // for k,v in update.new_items.items():
// for k,v in update.new_items.items(): // iu.payload.__setitem__(k, v, writer_name=update.writer_name)
// iu.payload.__setitem__(k, v, writer_name=update.writer_name) // else:
// else: // iu._set_payload(update.new_items, writer_name=update.writer_name)
// iu._set_payload(update.new_items, writer_name=update.writer_name) // self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category)
// self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category) // return iu.revision
// return iu.revision
/**
/** * Apply a remotely requested update to one of the stored IUs.
* Apply a remotely requested update to one of the stored IUs. * @return 0 if not updated, IU version number otherwise
* @return 0 if not updated, IU version number otherwise */
*/ int remoteUpdatePayload(IUPayloadUpdate update)
int remoteUpdatePayload(IUPayloadUpdate update) {
{ if (!iuStore.containsKey(update.getUid()))
if (!iuStore.containsKey(update.getUid())) {
{ logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}", update.getUid());
logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}",update.getUid()); return 0;
return 0; }
}
AbstractIU iu = iuStore.get(update.getUid());
AbstractIU iu = iuStore.get(update.getUid()); if (update.getRevision() != 0 && update.getRevision() != iu.getRevision())
if(update.getRevision()!=0 && update.getRevision() != iu.getRevision()) {
{ // (0 means "do not pay attention to the revision number" -> "force update")
//(0 means "do not pay attention to the revision number" -> "force update") logger.warn("Remote write operation failed because request was out of date; IU {}", update.getUid());
logger.warn("Remote write operation failed because request was out of date; IU {}",update.getUid()); return 0;
return 0; }
} if (update.getIsDelta())
if(update.getIsDelta()) {
{ for (String k : update.getKeysToRemoveList())
for(String k:update.getKeysToRemoveList()) {
{ iu.getPayload().remove(k, update.getWriterName());
iu.getPayload().remove(k, update.getWriterName()); }
} for (PayloadItem pli : update.getNewItemsList())
for (PayloadItem pli:update.getNewItemsList()) {
{ iu.getPayload().put(pli.getKey(), pli.getValue(), update.getWriterName());
iu.getPayload().put(pli.getKey(), pli.getValue(), update.getWriterName()); }
} }
} else
else {
{
iu.setPayload(update.getNewItemsList(), update.getWriterName());
iu.setPayload(update.getNewItemsList(), update.getWriterName()); }
} callIuEventHandlers(update.getUid(), true, IUEventType.UPDATED, iu.getCategory());
callIuEventHandlers(update.getUid(), true, IUEventType.UPDATED, iu.getCategory()); return iu.revision;
return iu.revision; }
}
/**
/**
* Apply a remotely requested update to one of the stored IUs. * Apply a remotely requested update to one of the stored IUs.
* @return 0 if not updated, IU version number otherwise * @return 0 if not updated, IU version number otherwise
*/ */
...@@ -178,264 +175,256 @@ public class OutputBuffer extends Buffer{ ...@@ -178,264 +175,256 @@ public class OutputBuffer extends Buffer{
{ {
if (!iuStore.containsKey(update.getUid())) if (!iuStore.containsKey(update.getUid()))
{ {
logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}",update.getUid()); logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}", update.getUid());
return 0; return 0;
} }
AbstractIU iu = iuStore.get(update.getUid()); AbstractIU iu = iuStore.get(update.getUid());
if(update.getRevision()!=0 && update.getRevision() != iu.getRevision()) if (update.getRevision() != 0 && update.getRevision() != iu.getRevision())
{ {
//(0 means "do not pay attention to the revision number" -> "force update") // (0 means "do not pay attention to the revision number" -> "force update")
logger.warn("Remote write operation failed because request was out of date; IU {}",update.getUid()); logger.warn("Remote write operation failed because request was out of date; IU {}", update.getUid());
return 0; return 0;
} }
if(update.getIsDelta()) if (update.getIsDelta())
{ {
SetMultimap<String, String> newLinks = HashMultimap.create(); SetMultimap<String, String> newLinks = HashMultimap.create();
for(LinkSet ls:update.getNewLinksList()) for (LinkSet ls : update.getNewLinksList())
{ {
newLinks.putAll(ls.getType(),ls.getTargetsList()); newLinks.putAll(ls.getType(), ls.getTargetsList());
} }
SetMultimap<String, String> removeLinks = HashMultimap.create(); SetMultimap<String, String> removeLinks = HashMultimap.create();
for(LinkSet ls:update.getLinksToRemoveList()) for (LinkSet ls : update.getLinksToRemoveList())
{ {
removeLinks.putAll(ls.getType(),ls.getTargetsList()); removeLinks.putAll(ls.getType(), ls.getTargetsList());
} }
iu.modifyLinks(newLinks, removeLinks); iu.modifyLinks(newLinks, removeLinks);
} }
else else
{ {
SetMultimap<String, String> newLinks = HashMultimap.create(); SetMultimap<String, String> newLinks = HashMultimap.create();
for(LinkSet ls:update.getNewLinksList()) for (LinkSet ls : update.getNewLinksList())
{ {
newLinks.putAll(ls.getType(),ls.getTargetsList()); newLinks.putAll(ls.getType(), ls.getTargetsList());
} }
iu.setLinks(newLinks); iu.setLinks(newLinks);
} }
callIuEventHandlers(update.getUid(), true, IUEventType.LINKSUPDATED, iu.getCategory()); callIuEventHandlers(update.getUid(), true, IUEventType.LINKSUPDATED, iu.getCategory());
return iu.revision; return iu.revision;
} }
// //
// def _generate_iu_uid(self): // def _remote_commit(self, iu_commission):
// '''Generate a unique IU id of the form''' // '''Apply a remotely requested commit to one of the stored IUs.'''
// with self.__iu_id_counter_lock: // if iu_commission.uid not in self._iu_store:
// self.__iu_id_counter += 1 // logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid))
// number = self.__iu_id_counter // return 0
// return self._id_prefix + str(number) // iu = self._iu_store[iu_commission.uid]
private String generateIUUid() // if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision):
{ // # (0 means "do not pay attention to the revision number" -> "force update")
synchronized(iuIdCounterLock) // logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
{ // return 0
iuIdCounter++; // if iu.committed:
return idPrefix+iuIdCounter; // return 0
} // else:
} // iu._internal_commit(writer_name=iu_commission.writer_name)
// self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=IUEventType.COMMITTED, category=iu.category)
// // return iu.revision
// def _remote_commit(self, iu_commission): /**
// '''Apply a remotely requested commit to one of the stored IUs.''' * Apply a remotely requested commit to one of the stored IUs.
// if iu_commission.uid not in self._iu_store: */
// logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid)) private int remoteCommit(IUCommission iuc)
// return 0
// iu = self._iu_store[iu_commission.uid]
// if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision):
// # (0 means "do not pay attention to the revision number" -> "force update")
// logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
// return 0
// if iu.committed:
// return 0
// else:
// iu._internal_commit(writer_name=iu_commission.writer_name)
// self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=IUEventType.COMMITTED, category=iu.category)
// return iu.revision
/**
* Apply a remotely requested commit to one of the stored IUs.
*/
private int remoteCommit(IUCommission iuc)
{
if(!iuStore.containsKey(iuc.getUid()))
{
logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}", iuc.getUid());
return 0;
}
AbstractIU iu = iuStore.get(iuc.getUid());
if(iuc.getRevision()!=0 && iuc.getRevision()!=iu.getRevision())
{
// (0 means "do not pay attention to the revision number" -> "force update")
logger.warn("Remote write operation failed because request was out of date; IU {}",iuc.getUid());
return 0;
}
if(iu.isCommitted())
{
return 0;
}
else
{
iu.commit(iuc.getWriterName());
callIuEventHandlers(iuc.getUid(), true, IUEventType.COMMITTED, iu.getCategory());
return iu.getRevision();
}
}
// def _get_informer(self, iu_category):
// '''Return (or create, store and return) an informer object for IUs of the specified category.'''
// if iu_category in self._informer_store:
// return self._informer_store[iu_category]
// informer_iu = rsb.createInformer(
// rsb.Scope("/ipaaca/category/"+str(iu_category)),
// config=self._participant_config,
// dataType=object)
// self._informer_store[iu_category] = informer_iu #new_tuple
// logger.info("Added informer on "+iu_category)
// return informer_iu #return new_tuple
/**
* Return (or create, store and return) an informer object for IUs of the specified category.
*/
private Informer<Object> getInformer(String category)
{
if(informerStore.containsKey(category))
{
return informerStore.get(category);
}
Informer<Object> informer = Factory.getInstance().createInformer("/ipaaca/category/"+category);
informerStore.put(category, informer);
logger.info("Added informer on "+category);
//XXX new in java version, apperently informers need activation and deactivation
try {
informer.activate();
} catch (InitializeException e) {
throw new RuntimeException(e);
}
return informer;
}
// def add(self, iu):
// '''Add an IU to the IU store, assign an ID and publish it.'''
// if iu._uid is not None:
// raise IUPublishedError(iu)
// iu.uid = self._generate_iu_uid()
// self._iu_store[iu._uid] = iu
// iu.buffer = self
// self._publish_iu(iu)
/**
* Add an IU to the IU store, assign an ID and publish it.
*/
public void add(LocalIU iu)
{
if (iu.getUid()!=null)
{
throw new IUPublishedException(iu);
}
iu.setUid(generateIUUid());
iuStore.put(iu.getUid(), iu);
iu.setBuffer(this);
publishIU(iu);
}
// def _publish_iu(self, iu):
// '''Publish an IU.'''
// informer = self._get_informer(iu._category)
// informer.publishData(iu)
public void publishIU(AbstractIU iu)
{
Informer<Object> informer = getInformer(iu.getCategory());
try {
informer.send(iu);
} catch (RSBException e) {
throw new RuntimeException(e);
}
}
// def _send_iu_commission(self, iu, writer_name):
// '''Send IU commission.
//
// Keyword arguments:
// iu -- the IU that has been committed to
// writer_name -- name of the Buffer that initiated this commit, necessary
// to enable remote components to filter out updates that originated
// from their own operations
// '''
// # a raw Protobuf object for IUCommission is produced
// # (unlike updates, where we have an intermediate class)
// iu_commission = iuProtoBuf_pb2.IUCommission()
// iu_commission.uid = iu.uid
// iu_commission.revision = iu.revision
// iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name
// # print('sending IU commission event')
// informer = self._get_informer(iu._category)
// informer.publishData(iu_commission)
/**
* @param iu the IU that has been committed to
* @param writerName name of the Buffer that initiated this commit, necessary
* to enable remote components to filter out updates that originated
* from their own operations
*/
public void sendIUCommission(AbstractIU iu, String writerName)
{
IUCommission iuc = Ipaaca.IUCommission.newBuilder()
.setUid(iu.getUid())
.setRevision(iu.getRevision())
.setWriterName(iu.getOwnerName()!=null?iu.getOwnerName():writerName)
.build();
Informer<Object> informer = getInformer(iu.getCategory());
try {
informer.send(iuc);
} catch (RSBException e) {
throw new RuntimeException(e);
}
}
// def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"):
// '''Send an IU payload update.
//
// Keyword arguments:
// iu -- the IU being updated
// is_delta -- whether the update concerns only a single payload item or
// the whole payload dictionary
// revision -- the new revision number
// new_items -- a dictionary of new payload items
// keys_to_remove -- a list of the keys that shall be removed from the
// payload
// writer_name -- name of the Buffer that initiated this update, necessary
// to enable remote components to filter out updates that originate d
// from their own operations
// '''
// if new_items is None:
// new_items = {}
// if keys_to_remove is None:
// keys_to_remove = []
// payload_update = IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision)
// payload_update.new_items = new_items
// if is_delta:
// payload_update.keys_to_remove = keys_to_remove
// payload_update.writer_name = writer_name
// informer = self._get_informer(iu._category)
// informer.publishData(payload_update)
public void sendIUPayloadUpdate(AbstractIU iu, IUPayloadUpdate update)
{
Informer<Object> informer = getInformer(iu.getCategory());
try {
informer.send(update);
} catch (RSBException e) {
throw new RuntimeException(e);
}
}
public void sendIULinkUpdate(AbstractIU iu, IULinkUpdate update)
{ {
if (!iuStore.containsKey(iuc.getUid()))
{
logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}", iuc.getUid());
return 0;
}
AbstractIU iu = iuStore.get(iuc.getUid());
if (iuc.getRevision() != 0 && iuc.getRevision() != iu.getRevision())
{
// (0 means "do not pay attention to the revision number" -> "force update")
logger.warn("Remote write operation failed because request was out of date; IU {}", iuc.getUid());
return 0;
}
if (iu.isCommitted())
{
return 0;
}
else
{
iu.commit(iuc.getWriterName());
callIuEventHandlers(iuc.getUid(), true, IUEventType.COMMITTED, iu.getCategory());
return iu.getRevision();
}
}
// def _get_informer(self, iu_category):
// '''Return (or create, store and return) an informer object for IUs of the specified category.'''
// if iu_category in self._informer_store:
// return self._informer_store[iu_category]
// informer_iu = rsb.createInformer(
// rsb.Scope("/ipaaca/category/"+str(iu_category)),
// config=self._participant_config,
// dataType=object)
// self._informer_store[iu_category] = informer_iu #new_tuple
// logger.info("Added informer on "+iu_category)
// return informer_iu #return new_tuple
/**
* Return (or create, store and return) an informer object for IUs of the specified category.
*/
private Informer<Object> getInformer(String category)
{
if (informerStore.containsKey(category))
{
return informerStore.get(category);
}
Informer<Object> informer = Factory.getInstance().createInformer("/ipaaca/category/" + category);
informerStore.put(category, informer);
logger.info("Added informer on " + category);
// XXX new in java version, apperently informers need activation and deactivation
try
{
informer.activate();
}
catch (InitializeException e)
{
throw new RuntimeException(e);
}
return informer;
}
// def add(self, iu):
// '''Add an IU to the IU store, assign an ID and publish it.'''
// if iu._uid is not None:
// raise IUPublishedError(iu)
// iu.uid = self._generate_iu_uid()
// self._iu_store[iu._uid] = iu
// iu.buffer = self
// self._publish_iu(iu)
/**
* Add an IU to the IU store, assign an ID and publish it.
*/
public void add(LocalIU iu)
{
if (iuStore.get(iu.getUid()) != null)
{
throw new IUPublishedException(iu);
}
iuStore.put(iu.getUid(), iu);
iu.setBuffer(this);
publishIU(iu);
}
// def _publish_iu(self, iu):
// '''Publish an IU.'''
// informer = self._get_informer(iu._category)
// informer.publishData(iu)
public void publishIU(AbstractIU iu)
{
Informer<Object> informer = getInformer(iu.getCategory());
try
{
informer.send(iu);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
}
// def _send_iu_commission(self, iu, writer_name):
// '''Send IU commission.
//
// Keyword arguments:
// iu -- the IU that has been committed to
// writer_name -- name of the Buffer that initiated this commit, necessary
// to enable remote components to filter out updates that originated
// from their own operations
// '''
// # a raw Protobuf object for IUCommission is produced
// # (unlike updates, where we have an intermediate class)
// iu_commission = iuProtoBuf_pb2.IUCommission()
// iu_commission.uid = iu.uid
// iu_commission.revision = iu.revision
// iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name
// # print('sending IU commission event')
// informer = self._get_informer(iu._category)
// informer.publishData(iu_commission)
/**
* @param iu the IU that has been committed to
* @param writerName name of the Buffer that initiated this commit, necessary
* to enable remote components to filter out updates that originated
* from their own operations
*/
public void sendIUCommission(AbstractIU iu, String writerName)
{
IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(iu.getUid()).setRevision(iu.getRevision())
.setWriterName(iu.getOwnerName() != null ? iu.getOwnerName() : writerName).build();
Informer<Object> informer = getInformer(iu.getCategory()); Informer<Object> informer = getInformer(iu.getCategory());
try { try
{
informer.send(iuc);
}
catch (RSBException e)
{
throw new RuntimeException(e);
}
}
// def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"):
// '''Send an IU payload update.
//
// Keyword arguments:
// iu -- the IU being updated
// is_delta -- whether the update concerns only a single payload item or
// the whole payload dictionary
// revision -- the new revision number
// new_items -- a dictionary of new payload items
// keys_to_remove -- a list of the keys that shall be removed from the
// payload
// writer_name -- name of the Buffer that initiated this update, necessary
// to enable remote components to filter out updates that originate d
// from their own operations
// '''
// if new_items is None:
// new_items = {}
// if keys_to_remove is None:
// keys_to_remove = []
// payload_update = IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision)
// payload_update.new_items = new_items
// if is_delta:
// payload_update.keys_to_remove = keys_to_remove
// payload_update.writer_name = writer_name
// informer = self._get_informer(iu._category)
// informer.publishData(payload_update)
public void sendIUPayloadUpdate(AbstractIU iu, IUPayloadUpdate update)
{
Informer<Object> informer = getInformer(iu.getCategory());
try
{
informer.send(update); informer.send(update);
} catch (RSBException e) { }
catch (RSBException e)
{
throw new RuntimeException(e);
}
}
public void sendIULinkUpdate(AbstractIU iu, IULinkUpdate update)
{
Informer<Object> informer = getInformer(iu.getCategory());
try
{
informer.send(update);
}
catch (RSBException e)
{
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
...@@ -445,11 +434,11 @@ public class OutputBuffer extends Buffer{ ...@@ -445,11 +434,11 @@ public class OutputBuffer extends Buffer{
{ {
return iuStore.get(iuid); return iuStore.get(iuid);
} }
public void close() public void close()
{ {
server.deactivate(); server.deactivate();
for(Informer<?> informer: informerStore.values()) for (Informer<?> informer : informerStore.values())
{ {
informer.deactivate(); informer.deactivate();
} }
......
package ipaacademo; package ipaacademo;
import ipaaca.AbstractIU;
import ipaaca.HandlerFunctor;
import ipaaca.IUEventHandler;
import ipaaca.IUEventType;
import ipaaca.Initializer;
import ipaaca.InputBuffer;
import ipaaca.OutputBuffer;
import ipaaca.RemotePushIU;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Map;
import java.util.HashMap;
import java.util.Set; import java.util.Set;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import ipaaca.*;
import javax.swing.JFrame; import javax.swing.JFrame;
import javax.swing.JLabel; import javax.swing.JLabel;
......
...@@ -361,5 +361,30 @@ public class ComponentCommunicationIntegrationTest ...@@ -361,5 +361,30 @@ public class ComponentCommunicationIntegrationTest
assertThat(iuIn.getLinks("SAME_LEVEL"),containsInAnyOrder("iu7")); assertThat(iuIn.getLinks("SAME_LEVEL"),containsInAnyOrder("iu7"));
} }
@Test
public void testPublishLinks() throws InterruptedException
{
LocalIU localIU2 = new LocalIU();
localIU2.setCategory(CATEGORY);
localIU2.getPayload().put("key1", "item2");
localIU.addLinks("SAME_LEVEL", ImmutableSet.of(localIU2.getUid()));
outBuffer.add(localIU);
outBuffer.add(localIU2);
Thread.sleep(200);
assertThat(localIU.getLinks("SAME_LEVEL"),containsInAnyOrder(localIU2.getUid()));
}
@Test
public void testPublishLinksRemote() throws InterruptedException
{
LocalIU localIU2 = new LocalIU();
localIU2.setCategory(CATEGORY);
localIU2.getPayload().put("key1", "item2");
localIU.addLinks("SAME_LEVEL", ImmutableSet.of(localIU2.getUid()));
outBuffer.add(localIU);
outBuffer.add(localIU2);
Thread.sleep(200);
AbstractIU iuIn = inBuffer.getIU(localIU.getUid());
assertThat(iuIn.getLinks("SAME_LEVEL"),containsInAnyOrder(localIU2.getUid()));
}
} }
...@@ -21,7 +21,7 @@ public class LocalIUTest ...@@ -21,7 +21,7 @@ public class LocalIUTest
@Test @Test
public void testCommit() public void testCommit()
{ {
LocalIU liu = new LocalIU("iu1"); LocalIU liu = new LocalIU();
liu.getPayload().put("key1", "item1"); liu.getPayload().put("key1", "item1");
liu.setBuffer(mockBuffer); liu.setBuffer(mockBuffer);
liu.commit("commitWriter"); liu.commit("commitWriter");
...@@ -33,7 +33,7 @@ public class LocalIUTest ...@@ -33,7 +33,7 @@ public class LocalIUTest
@Test @Test
public void testSetPayloadOnUnpublishedIU() public void testSetPayloadOnUnpublishedIU()
{ {
LocalIU liu = new LocalIU("iu1"); LocalIU liu = new LocalIU();
liu.getPayload().put("key1", "item1"); liu.getPayload().put("key1", "item1");
assertEquals("item1", liu.getPayload().get("key1")); assertEquals("item1", liu.getPayload().get("key1"));
} }
...@@ -41,7 +41,7 @@ public class LocalIUTest ...@@ -41,7 +41,7 @@ public class LocalIUTest
@Test @Test
public void testSetPayloadOnPublishedIU() public void testSetPayloadOnPublishedIU()
{ {
LocalIU liu = new LocalIU("iu1"); LocalIU liu = new LocalIU();
liu.setBuffer(mockBuffer); liu.setBuffer(mockBuffer);
liu.getPayload().put("key1", "item1"); liu.getPayload().put("key1", "item1");
assertEquals("item1", liu.getPayload().get("key1")); assertEquals("item1", liu.getPayload().get("key1"));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment