diff --git a/java/src/ipaaca/AbstractIU.java b/java/src/ipaaca/AbstractIU.java index 0b4a84abd03d83d1a938afc7c156de5d084be563..ba80690024b2962935d981692525198b73d33d35 100644 --- a/java/src/ipaaca/AbstractIU.java +++ b/java/src/ipaaca/AbstractIU.java @@ -184,11 +184,6 @@ public abstract class AbstractIU return uid; } - public void setUid(String uid) - { - this.uid = uid; - } - public abstract void commit(); // XXX: might not be valid for all types of IUs diff --git a/java/src/ipaaca/LocalIU.java b/java/src/ipaaca/LocalIU.java index 8fa38d212386e7e41c8c4c6fc61dded71810ec22..6abb71b824c65bff5ccedbeffee963b4df001369 100644 --- a/java/src/ipaaca/LocalIU.java +++ b/java/src/ipaaca/LocalIU.java @@ -5,6 +5,7 @@ import ipaaca.protobuf.Ipaaca.IUPayloadUpdate; import ipaaca.protobuf.Ipaaca.LinkSet; import ipaaca.protobuf.Ipaaca.PayloadItem; +import java.rmi.server.UID; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -32,12 +33,7 @@ public class LocalIU extends AbstractIU public LocalIU() { - this(null); - } - - public LocalIU(String uid) - { - super(uid); + super(new UID().toString()); revision = 1; payload = new Payload(this); } diff --git a/java/src/ipaaca/OutputBuffer.java b/java/src/ipaaca/OutputBuffer.java index 60521414ba5b5959ee4c7e73ca13c343450b0c7e..c976f7c01f4900f1ecf0f30786a720548896bd3d 100644 --- a/java/src/ipaaca/OutputBuffer.java +++ b/java/src/ipaaca/OutputBuffer.java @@ -27,150 +27,147 @@ import com.google.common.collect.SetMultimap; * An OutputBuffer that holds local IUs. * @author hvanwelbergen */ -public class OutputBuffer extends Buffer{ - - private final LocalServer server; - private Map<String,Informer<Object>> informerStore = new HashMap<String,Informer<Object>>(); //category -> informer map - private final String idPrefix; - private int iuIdCounter = 0; - private final Object iuIdCounterLock = new Object(); - private final static Logger logger = LoggerFactory.getLogger(OutputBuffer.class.getName()); - private IUStore<LocalIU> iuStore = new IUStore<LocalIU>(); - -// def __init__(self, owning_component_name, participant_config=None): -// '''Create an Output Buffer. -// -// Keyword arguments: -// owning_component_name -- name of the entity that own this buffer -// participant_config -- RSB configuration -// ''' -// super(OutputBuffer, self).__init__(owning_component_name, participant_config) -// self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB' -// self._server = rsb.createServer(rsb.Scope(self._unique_name)) -// self._server.addMethod('updatePayload', self._remote_update_payload, IUPayloadUpdate, int) -// self._server.addMethod('commit', self._remote_commit, iuProtoBuf_pb2.IUCommission, int) -// self._informer_store = {} -// self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-' -// self.__iu_id_counter_lock = threading.Lock() -// self.__iu_id_counter = 0 - /** - * @param owningComponentName name of the entity that own this buffer - */ - public OutputBuffer(String owningComponentName) { - super(owningComponentName); - - uniqueName = "/ipaaca/component/" + owningComponentName + "ID" + uuid + "/OB"; - logger.debug("Creating server for {}",uniqueName); - server = Factory.getInstance().createLocalServer(uniqueName); - try { - server.addMethod("updatePayload", new RemoteUpdatePayload()); - server.addMethod("updateLinks", new RemoteUpdateLinks()); - server.addMethod("commit", new RemoteCommit()); - server.activate(); - } catch (InitializeException e) { - throw new RuntimeException(e); - } - - idPrefix = owningComponentName+uuid.toString()+"-IU-"; - } - - private final class RemoteUpdatePayload implements DataCallback<Integer,IUPayloadUpdate> - { +public class OutputBuffer extends Buffer +{ + + private final LocalServer server; + private Map<String, Informer<Object>> informerStore = new HashMap<String, Informer<Object>>(); // category -> informer map + private final static Logger logger = LoggerFactory.getLogger(OutputBuffer.class.getName()); + private IUStore<LocalIU> iuStore = new IUStore<LocalIU>(); + + // def __init__(self, owning_component_name, participant_config=None): + // '''Create an Output Buffer. + // + // Keyword arguments: + // owning_component_name -- name of the entity that own this buffer + // participant_config -- RSB configuration + // ''' + // super(OutputBuffer, self).__init__(owning_component_name, participant_config) + // self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB' + // self._server = rsb.createServer(rsb.Scope(self._unique_name)) + // self._server.addMethod('updatePayload', self._remote_update_payload, IUPayloadUpdate, int) + // self._server.addMethod('commit', self._remote_commit, iuProtoBuf_pb2.IUCommission, int) + // self._informer_store = {} + // self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-' + // self.__iu_id_counter_lock = threading.Lock() + // self.__iu_id_counter = 0 + /** + * @param owningComponentName name of the entity that own this buffer + */ + public OutputBuffer(String owningComponentName) + { + super(owningComponentName); + + uniqueName = "/ipaaca/component/" + owningComponentName + "ID" + uuid + "/OB"; + logger.debug("Creating server for {}", uniqueName); + server = Factory.getInstance().createLocalServer(uniqueName); + try + { + server.addMethod("updatePayload", new RemoteUpdatePayload()); + server.addMethod("updateLinks", new RemoteUpdateLinks()); + server.addMethod("commit", new RemoteCommit()); + server.activate(); + } + catch (InitializeException e) + { + throw new RuntimeException(e); + } + + } + + private final class RemoteUpdatePayload implements DataCallback<Integer, IUPayloadUpdate> + { @Override public Integer invoke(IUPayloadUpdate data) throws Throwable { logger.debug("remoteUpdate"); - return remoteUpdatePayload(data); + return remoteUpdatePayload(data); } - - } - - private final class RemoteUpdateLinks implements DataCallback<Integer,IULinkUpdate> + + } + + private final class RemoteUpdateLinks implements DataCallback<Integer, IULinkUpdate> { @Override public Integer invoke(IULinkUpdate data) throws Throwable { logger.debug("remoteUpdateLinks"); - return remoteUpdateLinks(data); + return remoteUpdateLinks(data); } - + } - - private final class RemoteCommit implements DataCallback<Integer,IUCommission> - { - @Override + + private final class RemoteCommit implements DataCallback<Integer, IUCommission> + { + @Override public Integer invoke(IUCommission data) throws Throwable { - logger.debug("remoteCommit"); - return remoteCommit(data); + logger.debug("remoteCommit"); + return remoteCommit(data); } - - } - - -// def _remote_update_payload(self, update): -// '''Apply a remotely requested update to one of the stored IUs.''' -// if update.uid not in self._iu_store: -// logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid)) -// return 0 -// iu = self._iu_store[update.uid] -// if (update.revision != 0) and (update.revision != iu.revision): -// # (0 means "do not pay attention to the revision number" -> "force update") -// logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid)) -// return 0 -// if update.is_delta: -// for k in update.keys_to_remove: -// iu.payload.__delitem__(k, writer_name=update.writer_name) -// for k,v in update.new_items.items(): -// iu.payload.__setitem__(k, v, writer_name=update.writer_name) -// else: -// iu._set_payload(update.new_items, writer_name=update.writer_name) -// self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category) -// return iu.revision - - /** - * Apply a remotely requested update to one of the stored IUs. - * @return 0 if not updated, IU version number otherwise - */ - int remoteUpdatePayload(IUPayloadUpdate update) - { - if (!iuStore.containsKey(update.getUid())) - { - logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}",update.getUid()); - return 0; - } - - AbstractIU iu = iuStore.get(update.getUid()); - if(update.getRevision()!=0 && update.getRevision() != iu.getRevision()) - { - //(0 means "do not pay attention to the revision number" -> "force update") - logger.warn("Remote write operation failed because request was out of date; IU {}",update.getUid()); - return 0; - } - if(update.getIsDelta()) - { - for(String k:update.getKeysToRemoveList()) - { - iu.getPayload().remove(k, update.getWriterName()); - } - for (PayloadItem pli:update.getNewItemsList()) - { - iu.getPayload().put(pli.getKey(), pli.getValue(), update.getWriterName()); - } - } - else - { - - iu.setPayload(update.getNewItemsList(), update.getWriterName()); - } - callIuEventHandlers(update.getUid(), true, IUEventType.UPDATED, iu.getCategory()); - return iu.revision; - } - - - - - /** + + } + + // def _remote_update_payload(self, update): + // '''Apply a remotely requested update to one of the stored IUs.''' + // if update.uid not in self._iu_store: + // logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid)) + // return 0 + // iu = self._iu_store[update.uid] + // if (update.revision != 0) and (update.revision != iu.revision): + // # (0 means "do not pay attention to the revision number" -> "force update") + // logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid)) + // return 0 + // if update.is_delta: + // for k in update.keys_to_remove: + // iu.payload.__delitem__(k, writer_name=update.writer_name) + // for k,v in update.new_items.items(): + // iu.payload.__setitem__(k, v, writer_name=update.writer_name) + // else: + // iu._set_payload(update.new_items, writer_name=update.writer_name) + // self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category) + // return iu.revision + + /** + * Apply a remotely requested update to one of the stored IUs. + * @return 0 if not updated, IU version number otherwise + */ + int remoteUpdatePayload(IUPayloadUpdate update) + { + if (!iuStore.containsKey(update.getUid())) + { + logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}", update.getUid()); + return 0; + } + + AbstractIU iu = iuStore.get(update.getUid()); + if (update.getRevision() != 0 && update.getRevision() != iu.getRevision()) + { + // (0 means "do not pay attention to the revision number" -> "force update") + logger.warn("Remote write operation failed because request was out of date; IU {}", update.getUid()); + return 0; + } + if (update.getIsDelta()) + { + for (String k : update.getKeysToRemoveList()) + { + iu.getPayload().remove(k, update.getWriterName()); + } + for (PayloadItem pli : update.getNewItemsList()) + { + iu.getPayload().put(pli.getKey(), pli.getValue(), update.getWriterName()); + } + } + else + { + + iu.setPayload(update.getNewItemsList(), update.getWriterName()); + } + callIuEventHandlers(update.getUid(), true, IUEventType.UPDATED, iu.getCategory()); + return iu.revision; + } + + /** * Apply a remotely requested update to one of the stored IUs. * @return 0 if not updated, IU version number otherwise */ @@ -178,264 +175,256 @@ public class OutputBuffer extends Buffer{ { if (!iuStore.containsKey(update.getUid())) { - logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}",update.getUid()); + logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}", update.getUid()); return 0; } - + AbstractIU iu = iuStore.get(update.getUid()); - if(update.getRevision()!=0 && update.getRevision() != iu.getRevision()) + if (update.getRevision() != 0 && update.getRevision() != iu.getRevision()) { - //(0 means "do not pay attention to the revision number" -> "force update") - logger.warn("Remote write operation failed because request was out of date; IU {}",update.getUid()); + // (0 means "do not pay attention to the revision number" -> "force update") + logger.warn("Remote write operation failed because request was out of date; IU {}", update.getUid()); return 0; - } - if(update.getIsDelta()) + } + if (update.getIsDelta()) { SetMultimap<String, String> newLinks = HashMultimap.create(); - for(LinkSet ls:update.getNewLinksList()) + for (LinkSet ls : update.getNewLinksList()) { - newLinks.putAll(ls.getType(),ls.getTargetsList()); + newLinks.putAll(ls.getType(), ls.getTargetsList()); } - + SetMultimap<String, String> removeLinks = HashMultimap.create(); - for(LinkSet ls:update.getLinksToRemoveList()) + for (LinkSet ls : update.getLinksToRemoveList()) { - removeLinks.putAll(ls.getType(),ls.getTargetsList()); + removeLinks.putAll(ls.getType(), ls.getTargetsList()); } iu.modifyLinks(newLinks, removeLinks); } else { SetMultimap<String, String> newLinks = HashMultimap.create(); - for(LinkSet ls:update.getNewLinksList()) + for (LinkSet ls : update.getNewLinksList()) { - newLinks.putAll(ls.getType(),ls.getTargetsList()); + newLinks.putAll(ls.getType(), ls.getTargetsList()); } - iu.setLinks(newLinks); + iu.setLinks(newLinks); } callIuEventHandlers(update.getUid(), true, IUEventType.LINKSUPDATED, iu.getCategory()); return iu.revision; } - -// -// def _generate_iu_uid(self): -// '''Generate a unique IU id of the form''' -// with self.__iu_id_counter_lock: -// self.__iu_id_counter += 1 -// number = self.__iu_id_counter -// return self._id_prefix + str(number) - private String generateIUUid() - { - synchronized(iuIdCounterLock) - { - iuIdCounter++; - return idPrefix+iuIdCounter; - } - } - -// -// def _remote_commit(self, iu_commission): -// '''Apply a remotely requested commit to one of the stored IUs.''' -// if iu_commission.uid not in self._iu_store: -// logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid)) -// return 0 -// iu = self._iu_store[iu_commission.uid] -// if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision): -// # (0 means "do not pay attention to the revision number" -> "force update") -// logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid)) -// return 0 -// if iu.committed: -// return 0 -// else: -// iu._internal_commit(writer_name=iu_commission.writer_name) -// self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=IUEventType.COMMITTED, category=iu.category) -// return iu.revision - /** - * Apply a remotely requested commit to one of the stored IUs. - */ - private int remoteCommit(IUCommission iuc) - { - if(!iuStore.containsKey(iuc.getUid())) - { - logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}", iuc.getUid()); - return 0; - } - AbstractIU iu = iuStore.get(iuc.getUid()); - if(iuc.getRevision()!=0 && iuc.getRevision()!=iu.getRevision()) - { - // (0 means "do not pay attention to the revision number" -> "force update") - logger.warn("Remote write operation failed because request was out of date; IU {}",iuc.getUid()); - return 0; - } - if(iu.isCommitted()) - { - return 0; - } - else - { - iu.commit(iuc.getWriterName()); - callIuEventHandlers(iuc.getUid(), true, IUEventType.COMMITTED, iu.getCategory()); - return iu.getRevision(); - } - } - -// def _get_informer(self, iu_category): -// '''Return (or create, store and return) an informer object for IUs of the specified category.''' -// if iu_category in self._informer_store: -// return self._informer_store[iu_category] -// informer_iu = rsb.createInformer( -// rsb.Scope("/ipaaca/category/"+str(iu_category)), -// config=self._participant_config, -// dataType=object) -// self._informer_store[iu_category] = informer_iu #new_tuple -// logger.info("Added informer on "+iu_category) -// return informer_iu #return new_tuple - - /** - * Return (or create, store and return) an informer object for IUs of the specified category. - */ - private Informer<Object> getInformer(String category) - { - if(informerStore.containsKey(category)) - { - return informerStore.get(category); - } - Informer<Object> informer = Factory.getInstance().createInformer("/ipaaca/category/"+category); - - informerStore.put(category, informer); - logger.info("Added informer on "+category); - - //XXX new in java version, apperently informers need activation and deactivation - try { - informer.activate(); - } catch (InitializeException e) { - throw new RuntimeException(e); - } - return informer; - } - - - -// def add(self, iu): -// '''Add an IU to the IU store, assign an ID and publish it.''' -// if iu._uid is not None: -// raise IUPublishedError(iu) -// iu.uid = self._generate_iu_uid() -// self._iu_store[iu._uid] = iu -// iu.buffer = self -// self._publish_iu(iu) - /** - * Add an IU to the IU store, assign an ID and publish it. - */ - public void add(LocalIU iu) - { - if (iu.getUid()!=null) - { - throw new IUPublishedException(iu); - } - iu.setUid(generateIUUid()); - iuStore.put(iu.getUid(), iu); - iu.setBuffer(this); - publishIU(iu); - } - -// def _publish_iu(self, iu): -// '''Publish an IU.''' -// informer = self._get_informer(iu._category) -// informer.publishData(iu) - public void publishIU(AbstractIU iu) - { - Informer<Object> informer = getInformer(iu.getCategory()); - try { - informer.send(iu); - } catch (RSBException e) { - throw new RuntimeException(e); - } - } - -// def _send_iu_commission(self, iu, writer_name): -// '''Send IU commission. -// -// Keyword arguments: -// iu -- the IU that has been committed to -// writer_name -- name of the Buffer that initiated this commit, necessary -// to enable remote components to filter out updates that originated -// from their own operations -// ''' -// # a raw Protobuf object for IUCommission is produced -// # (unlike updates, where we have an intermediate class) -// iu_commission = iuProtoBuf_pb2.IUCommission() -// iu_commission.uid = iu.uid -// iu_commission.revision = iu.revision -// iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name -// # print('sending IU commission event') -// informer = self._get_informer(iu._category) -// informer.publishData(iu_commission) - /** - * @param iu the IU that has been committed to - * @param writerName name of the Buffer that initiated this commit, necessary - * to enable remote components to filter out updates that originated - * from their own operations - */ - public void sendIUCommission(AbstractIU iu, String writerName) - { - IUCommission iuc = Ipaaca.IUCommission.newBuilder() - .setUid(iu.getUid()) - .setRevision(iu.getRevision()) - .setWriterName(iu.getOwnerName()!=null?iu.getOwnerName():writerName) - .build(); - Informer<Object> informer = getInformer(iu.getCategory()); - try { - informer.send(iuc); - } catch (RSBException e) { - throw new RuntimeException(e); - } - } - - -// def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"): -// '''Send an IU payload update. -// -// Keyword arguments: -// iu -- the IU being updated -// is_delta -- whether the update concerns only a single payload item or -// the whole payload dictionary -// revision -- the new revision number -// new_items -- a dictionary of new payload items -// keys_to_remove -- a list of the keys that shall be removed from the -// payload -// writer_name -- name of the Buffer that initiated this update, necessary -// to enable remote components to filter out updates that originate d -// from their own operations -// ''' -// if new_items is None: -// new_items = {} -// if keys_to_remove is None: -// keys_to_remove = [] -// payload_update = IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision) -// payload_update.new_items = new_items -// if is_delta: -// payload_update.keys_to_remove = keys_to_remove -// payload_update.writer_name = writer_name -// informer = self._get_informer(iu._category) -// informer.publishData(payload_update) - - public void sendIUPayloadUpdate(AbstractIU iu, IUPayloadUpdate update) - { - Informer<Object> informer = getInformer(iu.getCategory()); - try { - informer.send(update); - } catch (RSBException e) { - throw new RuntimeException(e); - } - } - - public void sendIULinkUpdate(AbstractIU iu, IULinkUpdate update) + + // + // def _remote_commit(self, iu_commission): + // '''Apply a remotely requested commit to one of the stored IUs.''' + // if iu_commission.uid not in self._iu_store: + // logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid)) + // return 0 + // iu = self._iu_store[iu_commission.uid] + // if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision): + // # (0 means "do not pay attention to the revision number" -> "force update") + // logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid)) + // return 0 + // if iu.committed: + // return 0 + // else: + // iu._internal_commit(writer_name=iu_commission.writer_name) + // self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=IUEventType.COMMITTED, category=iu.category) + // return iu.revision + /** + * Apply a remotely requested commit to one of the stored IUs. + */ + private int remoteCommit(IUCommission iuc) { + if (!iuStore.containsKey(iuc.getUid())) + { + logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}", iuc.getUid()); + return 0; + } + AbstractIU iu = iuStore.get(iuc.getUid()); + if (iuc.getRevision() != 0 && iuc.getRevision() != iu.getRevision()) + { + // (0 means "do not pay attention to the revision number" -> "force update") + logger.warn("Remote write operation failed because request was out of date; IU {}", iuc.getUid()); + return 0; + } + if (iu.isCommitted()) + { + return 0; + } + else + { + iu.commit(iuc.getWriterName()); + callIuEventHandlers(iuc.getUid(), true, IUEventType.COMMITTED, iu.getCategory()); + return iu.getRevision(); + } + } + + // def _get_informer(self, iu_category): + // '''Return (or create, store and return) an informer object for IUs of the specified category.''' + // if iu_category in self._informer_store: + // return self._informer_store[iu_category] + // informer_iu = rsb.createInformer( + // rsb.Scope("/ipaaca/category/"+str(iu_category)), + // config=self._participant_config, + // dataType=object) + // self._informer_store[iu_category] = informer_iu #new_tuple + // logger.info("Added informer on "+iu_category) + // return informer_iu #return new_tuple + + /** + * Return (or create, store and return) an informer object for IUs of the specified category. + */ + private Informer<Object> getInformer(String category) + { + if (informerStore.containsKey(category)) + { + return informerStore.get(category); + } + Informer<Object> informer = Factory.getInstance().createInformer("/ipaaca/category/" + category); + + informerStore.put(category, informer); + logger.info("Added informer on " + category); + + // XXX new in java version, apperently informers need activation and deactivation + try + { + informer.activate(); + } + catch (InitializeException e) + { + throw new RuntimeException(e); + } + return informer; + } + + // def add(self, iu): + // '''Add an IU to the IU store, assign an ID and publish it.''' + // if iu._uid is not None: + // raise IUPublishedError(iu) + // iu.uid = self._generate_iu_uid() + // self._iu_store[iu._uid] = iu + // iu.buffer = self + // self._publish_iu(iu) + /** + * Add an IU to the IU store, assign an ID and publish it. + */ + public void add(LocalIU iu) + { + if (iuStore.get(iu.getUid()) != null) + { + throw new IUPublishedException(iu); + } + iuStore.put(iu.getUid(), iu); + iu.setBuffer(this); + publishIU(iu); + } + + // def _publish_iu(self, iu): + // '''Publish an IU.''' + // informer = self._get_informer(iu._category) + // informer.publishData(iu) + public void publishIU(AbstractIU iu) + { + Informer<Object> informer = getInformer(iu.getCategory()); + try + { + informer.send(iu); + } + catch (RSBException e) + { + throw new RuntimeException(e); + } + } + + // def _send_iu_commission(self, iu, writer_name): + // '''Send IU commission. + // + // Keyword arguments: + // iu -- the IU that has been committed to + // writer_name -- name of the Buffer that initiated this commit, necessary + // to enable remote components to filter out updates that originated + // from their own operations + // ''' + // # a raw Protobuf object for IUCommission is produced + // # (unlike updates, where we have an intermediate class) + // iu_commission = iuProtoBuf_pb2.IUCommission() + // iu_commission.uid = iu.uid + // iu_commission.revision = iu.revision + // iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name + // # print('sending IU commission event') + // informer = self._get_informer(iu._category) + // informer.publishData(iu_commission) + /** + * @param iu the IU that has been committed to + * @param writerName name of the Buffer that initiated this commit, necessary + * to enable remote components to filter out updates that originated + * from their own operations + */ + public void sendIUCommission(AbstractIU iu, String writerName) + { + IUCommission iuc = Ipaaca.IUCommission.newBuilder().setUid(iu.getUid()).setRevision(iu.getRevision()) + .setWriterName(iu.getOwnerName() != null ? iu.getOwnerName() : writerName).build(); Informer<Object> informer = getInformer(iu.getCategory()); - try { + try + { + informer.send(iuc); + } + catch (RSBException e) + { + throw new RuntimeException(e); + } + } + + // def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"): + // '''Send an IU payload update. + // + // Keyword arguments: + // iu -- the IU being updated + // is_delta -- whether the update concerns only a single payload item or + // the whole payload dictionary + // revision -- the new revision number + // new_items -- a dictionary of new payload items + // keys_to_remove -- a list of the keys that shall be removed from the + // payload + // writer_name -- name of the Buffer that initiated this update, necessary + // to enable remote components to filter out updates that originate d + // from their own operations + // ''' + // if new_items is None: + // new_items = {} + // if keys_to_remove is None: + // keys_to_remove = [] + // payload_update = IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision) + // payload_update.new_items = new_items + // if is_delta: + // payload_update.keys_to_remove = keys_to_remove + // payload_update.writer_name = writer_name + // informer = self._get_informer(iu._category) + // informer.publishData(payload_update) + + public void sendIUPayloadUpdate(AbstractIU iu, IUPayloadUpdate update) + { + Informer<Object> informer = getInformer(iu.getCategory()); + try + { informer.send(update); - } catch (RSBException e) { + } + catch (RSBException e) + { + throw new RuntimeException(e); + } + } + + public void sendIULinkUpdate(AbstractIU iu, IULinkUpdate update) + { + Informer<Object> informer = getInformer(iu.getCategory()); + try + { + informer.send(update); + } + catch (RSBException e) + { throw new RuntimeException(e); } } @@ -445,11 +434,11 @@ public class OutputBuffer extends Buffer{ { return iuStore.get(iuid); } - + public void close() { server.deactivate(); - for(Informer<?> informer: informerStore.values()) + for (Informer<?> informer : informerStore.values()) { informer.deactivate(); } diff --git a/java/src/ipaacademo/TextPrinter.java b/java/src/ipaacademo/TextPrinter.java index 61d820df4fabb10eb0b96215dc95c8e247624e95..ee494c8152e227a6f53a9ab7f4d7a37e8f18713f 100644 --- a/java/src/ipaacademo/TextPrinter.java +++ b/java/src/ipaacademo/TextPrinter.java @@ -1,13 +1,16 @@ package ipaacademo; +import ipaaca.AbstractIU; +import ipaaca.HandlerFunctor; +import ipaaca.IUEventHandler; +import ipaaca.IUEventType; +import ipaaca.Initializer; +import ipaaca.InputBuffer; +import ipaaca.OutputBuffer; +import ipaaca.RemotePushIU; + import java.util.EnumSet; -import java.util.Map; -import java.util.HashMap; import java.util.Set; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; - -import ipaaca.*; import javax.swing.JFrame; import javax.swing.JLabel; diff --git a/java/test/src/ipaaca/ComponentCommunicationIntegrationTest.java b/java/test/src/ipaaca/ComponentCommunicationIntegrationTest.java index 02c3958554a711eed00a4345e2ca746bd55bb1ba..3881b957470f2ff930b0e869d4a5b03bc355210c 100644 --- a/java/test/src/ipaaca/ComponentCommunicationIntegrationTest.java +++ b/java/test/src/ipaaca/ComponentCommunicationIntegrationTest.java @@ -361,5 +361,30 @@ public class ComponentCommunicationIntegrationTest assertThat(iuIn.getLinks("SAME_LEVEL"),containsInAnyOrder("iu7")); } + @Test + public void testPublishLinks() throws InterruptedException + { + LocalIU localIU2 = new LocalIU(); + localIU2.setCategory(CATEGORY); + localIU2.getPayload().put("key1", "item2"); + localIU.addLinks("SAME_LEVEL", ImmutableSet.of(localIU2.getUid())); + outBuffer.add(localIU); + outBuffer.add(localIU2); + Thread.sleep(200); + assertThat(localIU.getLinks("SAME_LEVEL"),containsInAnyOrder(localIU2.getUid())); + } + @Test + public void testPublishLinksRemote() throws InterruptedException + { + LocalIU localIU2 = new LocalIU(); + localIU2.setCategory(CATEGORY); + localIU2.getPayload().put("key1", "item2"); + localIU.addLinks("SAME_LEVEL", ImmutableSet.of(localIU2.getUid())); + outBuffer.add(localIU); + outBuffer.add(localIU2); + Thread.sleep(200); + AbstractIU iuIn = inBuffer.getIU(localIU.getUid()); + assertThat(iuIn.getLinks("SAME_LEVEL"),containsInAnyOrder(localIU2.getUid())); + } } diff --git a/java/test/src/ipaaca/LocalIUTest.java b/java/test/src/ipaaca/LocalIUTest.java index eb58b6400437c909c9af68bb63ea38317ae17d4d..f131997e69b4359a9b8d2951d9e98a085fac8a56 100644 --- a/java/test/src/ipaaca/LocalIUTest.java +++ b/java/test/src/ipaaca/LocalIUTest.java @@ -21,7 +21,7 @@ public class LocalIUTest @Test public void testCommit() { - LocalIU liu = new LocalIU("iu1"); + LocalIU liu = new LocalIU(); liu.getPayload().put("key1", "item1"); liu.setBuffer(mockBuffer); liu.commit("commitWriter"); @@ -33,7 +33,7 @@ public class LocalIUTest @Test public void testSetPayloadOnUnpublishedIU() { - LocalIU liu = new LocalIU("iu1"); + LocalIU liu = new LocalIU(); liu.getPayload().put("key1", "item1"); assertEquals("item1", liu.getPayload().get("key1")); } @@ -41,7 +41,7 @@ public class LocalIUTest @Test public void testSetPayloadOnPublishedIU() { - LocalIU liu = new LocalIU("iu1"); + LocalIU liu = new LocalIU(); liu.setBuffer(mockBuffer); liu.getPayload().put("key1", "item1"); assertEquals("item1", liu.getPayload().get("key1"));