Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • scs/ipaaca
  • ramin.yaghoubzadeh/ipaaca
2 results
Show changes
package ipaaca;
/**
* Error indicating that an IU is immutable because it has been committed to.
* @author hvanwelbergen
*
*/
public class IUCommittedException extends RuntimeException{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU() {
return iu;
}
public IUCommittedException(AbstractIU iu)
{
super("Writing to IU " + iu.getUid() + " failed -- it has been committed to.");
this.iu = iu;
}
}
package ipaaca;
import java.util.EnumSet;
import java.util.Set;
/**
* Wrapper for IU event handling functions.
* @author hvanwelbergen
*/
public class IUEventHandler {
private final EnumSet<IUEventType> eventTypes;
private Set<String>categories;
private final HandlerFunctor handleFunctor;
// def __init__(self, handler_function, for_event_types=None, for_categories=None):
// """Create an IUEventHandler.
//
// Keyword arguments:
// handler_function -- the handler function with the signature
// (IU, event_type, local)
// for_event_types -- a list of event types or None if handler should
// be called for all event types
// for_categories -- a list of category names or None if handler should
// be called for all categoires
// """
// super(IUEventHandler, self).__init__()
// self._handler_function = handler_function
// self._for_event_types = (
// None if for_event_types is None else
// (for_event_types[:] if hasattr(for_event_types, '__iter__') else [for_event_types]))
// self._for_categories = (
// None if for_categories is None else
// (for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories]))
public IUEventHandler(HandlerFunctor func, EnumSet<IUEventType> eventTypes, Set<String>categories)
{
this.eventTypes = eventTypes;
this.categories = categories;
this.handleFunctor = func;
}
// def condition_met(self, event_type, category):
// """Check whether this IUEventHandler should be called.
//
// Keyword arguments:
// event_type -- type of the IU event
// category -- category of the IU which triggered the event
// """
// type_condition_met = (self._for_event_types is None or event_type in self._for_event_types)
// cat_condition_met = (self._for_categories is None or category in self._for_categories)
// return type_condition_met and cat_condition_met
/**
* Check whether this IUEventHandler should be called.
* @param type type of the IU event
* @param category category of the IU which triggered the event
*/
private boolean conditionMet(IUEventType type, String category)
{
return eventTypes.contains(type) && categories.contains(category);
}
// def call(self, buffer, iu_uid, local, event_type, category):
// """Call this IUEventHandler's function, if it applies.
//
// Keyword arguments:
// buffer -- the buffer in which the IU is stored
// iu_uid -- the uid of the IU
// local -- is the IU local or remote to this component? @RAMIN: Is this correct?
// event_type -- IU event type
// category -- category of the IU
// """
// if self.condition_met(event_type, category):
// iu = buffer._iu_store[iu_uid]
// self._handler_function(iu, event_type, local)
public void call(Buffer buf, String iuUid, boolean local, IUEventType type, String category)
{
if(conditionMet(type,category))
{
AbstractIU iu = buf.getIU(iuUid);
handleFunctor.handle(iu, type, local);
}
}
}
package ipaaca;
public enum IUEventType {
ADDED, COMMITTED, DELETED, RETRACTED, UPDATED,LINKSUPDATED;
}
package ipaaca;
public class IUPublishedException extends RuntimeException{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU() {
return iu;
}
public IUPublishedException(AbstractIU iu)
{
super("IU " + iu.getUid() + " is already present in the output buffer.");
this.iu = iu;
}
}
package ipaaca;
public class IUReadOnlyException extends RuntimeException{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU() {
return iu;
}
public IUReadOnlyException(AbstractIU iu)
{
super("Writing to IU " + iu.getUid() + " failed -- it is read-only.");
this.iu = iu;
}
}
package ipaaca;
import java.util.HashMap;
public class IUStore<X extends AbstractIU> extends HashMap<String,X>{
private static final long serialVersionUID = 1L;
}
package ipaaca;
public class IUUpdateFailedException extends RuntimeException{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU() {
return iu;
}
public IUUpdateFailedException(AbstractIU iu)
{
super("Remote update failed for IU " + iu.getUid() + ".");
this.iu = iu;
}
}
/*******************************************************************************
* Copyright (C) 2009 Human Media Interaction, University of Twente, the Netherlands
*
* This file is part of the Elckerlyc BML realizer.
*
* Elckerlyc is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Elckerlyc is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Elckerlyc. If not, see http://www.gnu.org/licenses/.
******************************************************************************/
/**
* @(#) Info.java
* @version 1.0 3/09/2007
* @author Job Zwiers
*/
package ipaaca; // change this line for other packages
import javax.swing.JOptionPane;
/**
* The Info class is intended to be used as "Main class" when the package is
* jarred. Running java -jar <packageJarFile> will print some package
* information. Note that some of this information is only available from the
* Manifest.mf file, that is included in the jar file, and not when running
* directly from compiled classes.
*/
public final class Info
{
private Info()
{
}
private static Package pack = new Info().getClass().getPackage();
public final static String packageName = pack.getName();
/**
* Yields a String containing manifest file info. When not running from a
* jar file, only the package name is included.
*/
public static String manifestInfo()
{
StringBuilder buf = new StringBuilder();
buf.append("Package: ");
buf.append(packageName);
buf.append("\n");
if (pack.getSpecificationTitle() != null)
{
buf.append("Specification-Title: " + pack.getSpecificationTitle() + "\n");
}
if (pack.getSpecificationVersion() != null)
{
buf.append("Specification-Version: " + pack.getSpecificationVersion() + "\n");
}
if (pack.getSpecificationVendor() != null)
{
buf.append("Specification-Vendor: " + pack.getSpecificationVendor() + "\n");
}
if (pack.getImplementationTitle() != null)
{
buf.append("Implementation-Title: " + pack.getImplementationTitle() + "\n");
}
if (pack.getImplementationVersion() != null)
{
buf.append("Implementation-Version: " + pack.getImplementationVersion() + "\n");
}
if (pack.getImplementationVendor() != null)
{
buf.append("Implementation-Vendor: " + pack.getImplementationVendor() + "\n");
}
return buf.toString();
}
/**
* Checks whether the current specification version meets the specified
* required version; if not, a RuntimeException is thrown. No check is
* performed when manifest info is not available.
*/
public static void requireVersion(String requiredVersion)
{
if (pack.getSpecificationVersion() == null) return; // no check possible, so assume ok
if (pack.isCompatibleWith(requiredVersion)) return;
String msg = "Package " + packageName + " Version " + pack.getSpecificationVersion() + " does not meet the required version "
+ requiredVersion;
JOptionPane.showMessageDialog(null, msg, "Package Info", JOptionPane.PLAIN_MESSAGE);
throw new RuntimeException(msg);
}
/**
* Returns the specification version from the Manifest.mf file, if
* available, or else an empty String.
*/
public static String getVersion()
{
String result = pack.getSpecificationVersion();
return (result == null) ? "" : result;
}
/**
* checks whether the package specification version is compatible with a
* certain desired version. &quot;isCompatibleWith(desiredversion)&quot;
* return true iff the desiredVersion is smaller or equal than the package
* specification version, where "smaller than" is determined by the
* lexicographic order on major, minor, and micro version numbers. (Missing
* numbers are considered to be 0). For instance, when the package
* specification version would be 2.1, then some examples of compatible
* desired versions are: 1, 1.0, 1.6, 2.0.4, 2.1, 2.1.0, whereas desired
* versions like 2.2, 3.0, or 2.1.1 would not be compatible.
*/
public static boolean isCompatibleWith(String desiredVersion)
{
String specificationVersion = pack.getSpecificationVersion();
if (specificationVersion == null) return true; // no spec version available, so assume ok
String[] desiredNums = desiredVersion.split("[.]");
String[] specificationNums = specificationVersion.split("[.]");
int desired, specified;
try
{
for (int vn = 0; vn < desiredNums.length; vn++)
{
// System.out.println(" desired num " + vn + ": " +
// desiredNums[vn] + " specification num: " +
// specificationNums[vn]);
desired = Integer.valueOf(desiredNums[vn]);
if (vn < specificationNums.length)
{
specified = Integer.valueOf(specificationNums[vn]);
}
else
{
specified = 0;
}
if (desired < specified) return true;
if (desired > specified) return false;
}
return true;
}
catch (NumberFormatException e)
{
System.out.println(packageName + ".Info.isCompatibelWith method: illegal version numbers: " + desiredVersion + " / "
+ specificationVersion);
return false;
}
}
/*
* Show some package information
*/
public static void main(String[] arg)
{
JOptionPane.showMessageDialog(null, Info.manifestInfo(), "Package Info", JOptionPane.PLAIN_MESSAGE);
}
}
package ipaaca;
import ipaaca.Ipaaca.IUCommission;
import ipaaca.Ipaaca.IULinkUpdate;
import ipaaca.Ipaaca.IUPayloadUpdate;
import rsb.converter.ConverterSignature;
import rsb.converter.DefaultConverterRepository;
import rsb.converter.ProtocolBufferConverter;
public final class Initializer {
// def initialize_ipaaca_rsb():#{{{
// rsb.transport.converter.registerGlobalConverter(
// IntConverter(wireSchema="int32", dataType=int))
// rsb.transport.converter.registerGlobalConverter(
// IUConverter(wireSchema="ipaaca-iu", dataType=IU))
// rsb.transport.converter.registerGlobalConverter(
// IUPayloadUpdateConverter(
// wireSchema="ipaaca-iu-payload-update",
// dataType=IUPayloadUpdate))
// rsb.transport.converter.registerGlobalConverter(
// rsb.transport.converter.ProtocolBufferConverter(
// messageClass=iuProtoBuf_pb2.IUCommission))
// rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()
// #}}}
public static void initializeIpaacaRsb()
{
DefaultConverterRepository.getDefaultConverterRepository().addConverter(new IntConverter());
DefaultConverterRepository.getDefaultConverterRepository()
.addConverter(new ProtocolBufferConverter<IUCommission>(IUCommission.getDefaultInstance()));
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new IUConverter(new ConverterSignature("ipaaca-iu", RemotePushIU.class)));
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new IUConverter(new ConverterSignature("ipaaca-localiu", LocalIU.class)));
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new PayloadConverter());
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new LinkUpdateConverter());
}
}
package ipaaca;
import ipaaca.Ipaaca.IUCommission;
import ipaaca.Ipaaca.IULinkUpdate;
import ipaaca.Ipaaca.IUPayloadUpdate;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rsb.Event;
import rsb.Factory;
import rsb.Handler;
import rsb.InitializeException;
import rsb.Listener;
import rsb.Scope;
import rsb.patterns.RemoteServer;
/**
* An InputBuffer that holds remote IUs.
* @author hvanwelbergen
*/
public class InputBuffer extends Buffer{
private Map<String,RemoteServer> remoteServerStore = new HashMap<String,RemoteServer>();
private Map<String,Listener> listenerStore = new HashMap<String,Listener>();
private Set<String> categoryInterests = new HashSet<String>();
private final static Logger logger = LoggerFactory.getLogger(InputBuffer.class.getName());
private IUStore<RemotePushIU> iuStore = new IUStore<RemotePushIU>();
public void close()
{
for(Listener listener: listenerStore.values())
{
listener.deactivate();
}
for(RemoteServer remServer: remoteServerStore.values())
{
remServer.deactivate();
}
}
// def __init__(self, owning_component_name, category_interests=None, participant_config=None):
// '''Create an InputBuffer.
//
// Keyword arguments:
// owning_compontent_name -- name of the entity that owns this InputBuffer
// category_interests -- list of IU categories this Buffer is interested in
// participant_config = RSB configuration
// '''
// super(InputBuffer, self).__init__(owning_component_name, participant_config)
// self._unique_name = '/ipaaca/component/'+str(owning_component_name)+'ID'+self._uuid+'/IB'
// self._listener_store = {} # one per IU category
// self._remote_server_store = {} # one per remote-IU-owning Component
// self._category_interests = []
// if category_interests is not None:
// for cat in category_interests:
// self._create_category_listener_if_needed(cat)
public InputBuffer(String owningComponentName, Set<String>categoryInterests)
{
super(owningComponentName);
uniqueName = "/ipaaca/component/"+ owningComponentName +"ID"+uuid+"/IB";
for (String cat:categoryInterests)
{
createCategoryListenerIfNeeded(cat);
}
}
// def _get_remote_server(self, iu):
// '''Return (or create, store and return) a remote server.'''
// if iu.owner_name in self._remote_server_store:
// return self._remote_server_store[iu.owner_name]
// remote_server = rsb.createRemoteServer(rsb.Scope(str(iu.owner_name)))
// self._remote_server_store[iu.owner_name] = remote_server
// return remote_server
public RemoteServer getRemoteServer(AbstractIU iu)
{
if(remoteServerStore.containsKey(iu.getOwnerName()))
{
return remoteServerStore.get(iu.getOwnerName());
}
logger.debug("Getting remote server for {}",iu.getOwnerName());
RemoteServer remoteServer = Factory.getInstance().createRemoteServer(new Scope(iu.getOwnerName()));
try
{
remoteServer.activate();
}
catch (InitializeException e)
{
throw new RuntimeException(e);
}
remoteServerStore.put(iu.getOwnerName(), remoteServer);
return remoteServer;
}
// def _create_category_listener_if_needed(self, iu_category):
// '''Return (or create, store and return) a category listener.'''
// if iu_category in self._listener_store: return self._informer_store[iu_category]
// cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
// cat_listener.addHandler(self._handle_iu_events)
// self._listener_store[iu_category] = cat_listener
// self._category_interests.append(iu_category)
// logger.info("Added category listener for "+iu_category)
// return cat_listener
private Listener createCategoryListenerIfNeeded(String category)
{
if(listenerStore.containsKey(category))
{
return listenerStore.get(category);
}
Listener listener = Factory.getInstance().createListener(new Scope("/ipaaca/category/"+category));
listenerStore.put(category,listener);
categoryInterests.add(category);
listener.addHandler(new InputHandler(), true);
logger.info("Added category listener for {}",category);
try
{
listener.activate();
}
catch (InitializeException e)
{
throw new RuntimeException(e);
}
return listener;
}
class InputHandler implements Handler
{
@Override
public void internalNotify(Event ev) {
handleIUEvents(ev);
}
}
// def _handle_iu_events(self, event):
// '''Dispatch incoming IU events.
//
// Adds incoming IU's to the store, applies payload and commit updates to
// IU, calls IU event handlers.'
//
// Keyword arguments:
// event -- a converted RSB event
// '''
// if type(event.data) is RemotePushIU:
// # a new IU
// if event.data.uid in self._iu_store:
// # already in our store
// pass
// else:
// self._iu_store[ event.data.uid ] = event.data
// event.data.buffer = self
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.ADDED, category=event.data.category)
// else:
// # an update to an existing IU
// if event.data.writer_name == self.unique_name:
// # Discard updates that originate from this buffer
// return
// if event.data.uid not in self._iu_store:
// # TODO: we should request the IU's owner to send us the IU
// logger.warning("Update message for IU which we did not fully receive before.")
// return
// if type(event.data) is iuProtoBuf_pb2.IUCommission:
// # IU commit
// iu = self._iu_store[event.data.uid]
// iu._apply_commission()
// iu._revision = event.data.revision
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category)
// elif type(event.data) is IUPayloadUpdate:
// # IU payload update
// iu = self._iu_store[event.data.uid]
// iu._apply_update(event.data)
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category)
/**
* Dispatch incoming IU events.
*/
private void handleIUEvents(Event event)
{
if(event.getData() instanceof RemotePushIU)
{
RemotePushIU rp = (RemotePushIU)event.getData();
//a new IU
if(iuStore.containsKey(rp.getUid()))
{
// already in our store
return;
}
else
{
iuStore.put(rp.getUid(), rp);
rp.setBuffer(this);
this.callIuEventHandlers(rp.getUid(), false, IUEventType.ADDED, rp.getCategory());
}
}
else
{
if (event.getData() instanceof IULinkUpdate)
{
IULinkUpdate iuLinkUpdate = (IULinkUpdate)event.getData();
if(iuLinkUpdate.getWriterName().equals(this.getUniqueName()))
{
//Discard updates that originate from this buffer
return;
}
if(!iuStore.containsKey(iuLinkUpdate.getUid()))
{
logger.warn("Link update message for IU which we did not fully receive before.");
return;
}
RemotePushIU iu = this.iuStore.get(iuLinkUpdate.getUid());
iu.applyLinkUpdate(iuLinkUpdate);
callIuEventHandlers(iu.getUid(), false, IUEventType.LINKSUPDATED, iu.category);
}
if (event.getData() instanceof IUPayloadUpdate)
{
IUPayloadUpdate iuUpdate = (IUPayloadUpdate)event.getData();
logger.debug("handleIUEvents invoked with an IUPayloadUpdate: {}", iuUpdate);
if(iuUpdate.getWriterName().equals(this.getUniqueName()))
{
//Discard updates that originate from this buffer
return;
}
if(!iuStore.containsKey(iuUpdate.getUid()))
{
logger.warn("Update message for IU which we did not fully receive before.");
return;
}
RemotePushIU iu = this.iuStore.get(iuUpdate.getUid());
iu.applyUpdate(iuUpdate);
callIuEventHandlers(iu.getUid(), false, IUEventType.UPDATED, iu.category);
}
if (event.getData() instanceof IUCommission)
{
IUCommission iuc = (IUCommission)event.getData();
logger.debug("handleIUEvents invoked with an IUCommission: {}", iuc);
logger.debug("{}, {}",iuc.getWriterName(), this.getUniqueName());
if(iuc.getWriterName().equals(this.getUniqueName()))
{
//Discard updates that originate from this buffer
return;
}
if(!iuStore.containsKey(iuc.getUid()))
{
logger.warn("Update message for IU which we did not fully receive before.");
return;
}
RemotePushIU iu = this.iuStore.get(iuc.getUid());
iu.applyCommmision();
iu.setRevision(iuc.getRevision());
callIuEventHandlers(iuc.getUid(), false, IUEventType.COMMITTED, iu.getCategory());
}
}
}
public InputBuffer(String owningComponentName) {
super(owningComponentName);
}
@Override
public AbstractIU getIU(String iuid)
{
return iuStore.get(iuid);
}
public Collection<RemotePushIU> getIUs()
{
return iuStore.values();
}
}
Source diff could not be displayed: it is too large. Options to address this: view the blob.
package ipaaca;
import rsb.Factory;
import rsb.Informer;
import rsb.InitializeException;
import rsb.RSBException;
import rsb.patterns.DataCallback;
import rsb.patterns.LocalServer;
import ipaaca.Ipaaca;
import ipaaca.Ipaaca.IUCommission;
import ipaaca.Ipaaca.IULinkUpdate;
import ipaaca.Ipaaca.IUPayloadUpdate;
import ipaaca.Ipaaca.LinkSet;
import ipaaca.Ipaaca.PayloadItem;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
/**
* An OutputBuffer that holds local IUs.
* @author hvanwelbergen
*/
public class OutputBuffer extends Buffer{
private final LocalServer server;
private Map<String,Informer<Object>> informerStore = new HashMap<String,Informer<Object>>(); //category -> informer map
private final String idPrefix;
private int iuIdCounter = 0;
private final Object iuIdCounterLock = new Object();
private final static Logger logger = LoggerFactory.getLogger(OutputBuffer.class.getName());
private IUStore<LocalIU> iuStore = new IUStore<LocalIU>();
// def __init__(self, owning_component_name, participant_config=None):
// '''Create an Output Buffer.
//
// Keyword arguments:
// owning_component_name -- name of the entity that own this buffer
// participant_config -- RSB configuration
// '''
// super(OutputBuffer, self).__init__(owning_component_name, participant_config)
// self._unique_name = '/ipaaca/component/' + str(owning_component_name) + 'ID' + self._uuid + '/OB'
// self._server = rsb.createServer(rsb.Scope(self._unique_name))
// self._server.addMethod('updatePayload', self._remote_update_payload, IUPayloadUpdate, int)
// self._server.addMethod('commit', self._remote_commit, iuProtoBuf_pb2.IUCommission, int)
// self._informer_store = {}
// self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-'
// self.__iu_id_counter_lock = threading.Lock()
// self.__iu_id_counter = 0
/**
* @param owningComponentName name of the entity that own this buffer
*/
public OutputBuffer(String owningComponentName) {
super(owningComponentName);
uniqueName = "/ipaaca/component/" + owningComponentName + "ID" + uuid + "/OB";
logger.debug("Creating server for {}",uniqueName);
server = Factory.getInstance().createLocalServer(uniqueName);
try {
server.addMethod("updatePayload", new RemoteUpdatePayload());
server.addMethod("updateLinks", new RemoteUpdateLinks());
server.addMethod("commit", new RemoteCommit());
server.activate();
} catch (InitializeException e) {
throw new RuntimeException(e);
}
idPrefix = owningComponentName+uuid.toString()+"-IU-";
}
private final class RemoteUpdatePayload implements DataCallback<Integer,IUPayloadUpdate>
{
@Override
public Integer invoke(IUPayloadUpdate data) throws Throwable
{
logger.debug("remoteUpdate");
return remoteUpdatePayload(data);
}
}
private final class RemoteUpdateLinks implements DataCallback<Integer,IULinkUpdate>
{
@Override
public Integer invoke(IULinkUpdate data) throws Throwable
{
logger.debug("remoteUpdateLinks");
return remoteUpdateLinks(data);
}
}
private final class RemoteCommit implements DataCallback<Integer,IUCommission>
{
@Override
public Integer invoke(IUCommission data) throws Throwable
{
logger.debug("remoteCommit");
return remoteCommit(data);
}
}
// def _remote_update_payload(self, update):
// '''Apply a remotely requested update to one of the stored IUs.'''
// if update.uid not in self._iu_store:
// logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
// return 0
// iu = self._iu_store[update.uid]
// if (update.revision != 0) and (update.revision != iu.revision):
// # (0 means "do not pay attention to the revision number" -> "force update")
// logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
// return 0
// if update.is_delta:
// for k in update.keys_to_remove:
// iu.payload.__delitem__(k, writer_name=update.writer_name)
// for k,v in update.new_items.items():
// iu.payload.__setitem__(k, v, writer_name=update.writer_name)
// else:
// iu._set_payload(update.new_items, writer_name=update.writer_name)
// self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category)
// return iu.revision
/**
* Apply a remotely requested update to one of the stored IUs.
* @return 0 if not updated, IU version number otherwise
*/
int remoteUpdatePayload(IUPayloadUpdate update)
{
if (!iuStore.containsKey(update.getUid()))
{
logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}",update.getUid());
return 0;
}
AbstractIU iu = iuStore.get(update.getUid());
if(update.getRevision()!=0 && update.getRevision() != iu.getRevision())
{
//(0 means "do not pay attention to the revision number" -> "force update")
logger.warn("Remote write operation failed because request was out of date; IU {}",update.getUid());
return 0;
}
if(update.getIsDelta())
{
for(String k:update.getKeysToRemoveList())
{
iu.getPayload().remove(k, update.getWriterName());
}
for (PayloadItem pli:update.getNewItemsList())
{
iu.getPayload().put(pli.getKey(), pli.getValue(), update.getWriterName());
}
}
else
{
iu.setPayload(update.getNewItemsList(), update.getWriterName());
}
callIuEventHandlers(update.getUid(), true, IUEventType.UPDATED, iu.getCategory());
return iu.revision;
}
/**
* Apply a remotely requested update to one of the stored IUs.
* @return 0 if not updated, IU version number otherwise
*/
int remoteUpdateLinks(IULinkUpdate update)
{
if (!iuStore.containsKey(update.getUid()))
{
logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}",update.getUid());
return 0;
}
AbstractIU iu = iuStore.get(update.getUid());
if(update.getRevision()!=0 && update.getRevision() != iu.getRevision())
{
//(0 means "do not pay attention to the revision number" -> "force update")
logger.warn("Remote write operation failed because request was out of date; IU {}",update.getUid());
return 0;
}
if(update.getIsDelta())
{
SetMultimap<String, String> newLinks = HashMultimap.create();
for(LinkSet ls:update.getNewLinksList())
{
newLinks.putAll(ls.getType(),ls.getTargetsList());
}
SetMultimap<String, String> removeLinks = HashMultimap.create();
for(LinkSet ls:update.getLinksToRemoveList())
{
removeLinks.putAll(ls.getType(),ls.getTargetsList());
}
iu.modifyLinks(newLinks, removeLinks);
}
else
{
SetMultimap<String, String> newLinks = HashMultimap.create();
for(LinkSet ls:update.getNewLinksList())
{
newLinks.putAll(ls.getType(),ls.getTargetsList());
}
iu.setLinks(newLinks);
}
callIuEventHandlers(update.getUid(), true, IUEventType.LINKSUPDATED, iu.getCategory());
return iu.revision;
}
//
// def _generate_iu_uid(self):
// '''Generate a unique IU id of the form'''
// with self.__iu_id_counter_lock:
// self.__iu_id_counter += 1
// number = self.__iu_id_counter
// return self._id_prefix + str(number)
private String generateIUUid()
{
synchronized(iuIdCounterLock)
{
iuIdCounter++;
return idPrefix+iuIdCounter;
}
}
//
// def _remote_commit(self, iu_commission):
// '''Apply a remotely requested commit to one of the stored IUs.'''
// if iu_commission.uid not in self._iu_store:
// logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid))
// return 0
// iu = self._iu_store[iu_commission.uid]
// if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision):
// # (0 means "do not pay attention to the revision number" -> "force update")
// logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
// return 0
// if iu.committed:
// return 0
// else:
// iu._internal_commit(writer_name=iu_commission.writer_name)
// self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=IUEventType.COMMITTED, category=iu.category)
// return iu.revision
/**
* Apply a remotely requested commit to one of the stored IUs.
*/
private int remoteCommit(IUCommission iuc)
{
if(!iuStore.containsKey(iuc.getUid()))
{
logger.warn("Remote InBuffer tried to spuriously write non-existent IU {}", iuc.getUid());
return 0;
}
AbstractIU iu = iuStore.get(iuc.getUid());
if(iuc.getRevision()!=0 && iuc.getRevision()!=iu.getRevision())
{
// (0 means "do not pay attention to the revision number" -> "force update")
logger.warn("Remote write operation failed because request was out of date; IU {}",iuc.getUid());
return 0;
}
if(iu.isCommitted())
{
return 0;
}
else
{
iu.commit(iuc.getWriterName());
callIuEventHandlers(iuc.getUid(), true, IUEventType.COMMITTED, iu.getCategory());
return iu.getRevision();
}
}
// def _get_informer(self, iu_category):
// '''Return (or create, store and return) an informer object for IUs of the specified category.'''
// if iu_category in self._informer_store:
// return self._informer_store[iu_category]
// informer_iu = rsb.createInformer(
// rsb.Scope("/ipaaca/category/"+str(iu_category)),
// config=self._participant_config,
// dataType=object)
// self._informer_store[iu_category] = informer_iu #new_tuple
// logger.info("Added informer on "+iu_category)
// return informer_iu #return new_tuple
/**
* Return (or create, store and return) an informer object for IUs of the specified category.
*/
private Informer<Object> getInformer(String category)
{
if(informerStore.containsKey(category))
{
return informerStore.get(category);
}
Informer<Object> informer = Factory.getInstance().createInformer("/ipaaca/category/"+category);
informerStore.put(category, informer);
logger.info("Added informer on "+category);
//XXX new in java version, apperently informers need activation and deactivation
try {
informer.activate();
} catch (InitializeException e) {
throw new RuntimeException(e);
}
return informer;
}
// def add(self, iu):
// '''Add an IU to the IU store, assign an ID and publish it.'''
// if iu._uid is not None:
// raise IUPublishedError(iu)
// iu.uid = self._generate_iu_uid()
// self._iu_store[iu._uid] = iu
// iu.buffer = self
// self._publish_iu(iu)
/**
* Add an IU to the IU store, assign an ID and publish it.
*/
public void add(LocalIU iu)
{
if (iu.getUid()!=null)
{
throw new IUPublishedException(iu);
}
iu.setUid(generateIUUid());
iuStore.put(iu.getUid(), iu);
iu.setBuffer(this);
publishIU(iu);
}
// def _publish_iu(self, iu):
// '''Publish an IU.'''
// informer = self._get_informer(iu._category)
// informer.publishData(iu)
public void publishIU(AbstractIU iu)
{
Informer<Object> informer = getInformer(iu.getCategory());
try {
informer.send(iu);
} catch (RSBException e) {
throw new RuntimeException(e);
}
}
// def _send_iu_commission(self, iu, writer_name):
// '''Send IU commission.
//
// Keyword arguments:
// iu -- the IU that has been committed to
// writer_name -- name of the Buffer that initiated this commit, necessary
// to enable remote components to filter out updates that originated
// from their own operations
// '''
// # a raw Protobuf object for IUCommission is produced
// # (unlike updates, where we have an intermediate class)
// iu_commission = iuProtoBuf_pb2.IUCommission()
// iu_commission.uid = iu.uid
// iu_commission.revision = iu.revision
// iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name
// # print('sending IU commission event')
// informer = self._get_informer(iu._category)
// informer.publishData(iu_commission)
/**
* @param iu the IU that has been committed to
* @param writerName name of the Buffer that initiated this commit, necessary
* to enable remote components to filter out updates that originated
* from their own operations
*/
public void sendIUCommission(AbstractIU iu, String writerName)
{
IUCommission iuc = Ipaaca.IUCommission.newBuilder()
.setUid(iu.getUid())
.setRevision(iu.getRevision())
.setWriterName(iu.getOwnerName()!=null?iu.getOwnerName():writerName)
.build();
Informer<Object> informer = getInformer(iu.getCategory());
try {
informer.send(iuc);
} catch (RSBException e) {
throw new RuntimeException(e);
}
}
// def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"):
// '''Send an IU payload update.
//
// Keyword arguments:
// iu -- the IU being updated
// is_delta -- whether the update concerns only a single payload item or
// the whole payload dictionary
// revision -- the new revision number
// new_items -- a dictionary of new payload items
// keys_to_remove -- a list of the keys that shall be removed from the
// payload
// writer_name -- name of the Buffer that initiated this update, necessary
// to enable remote components to filter out updates that originate d
// from their own operations
// '''
// if new_items is None:
// new_items = {}
// if keys_to_remove is None:
// keys_to_remove = []
// payload_update = IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision)
// payload_update.new_items = new_items
// if is_delta:
// payload_update.keys_to_remove = keys_to_remove
// payload_update.writer_name = writer_name
// informer = self._get_informer(iu._category)
// informer.publishData(payload_update)
public void sendIUPayloadUpdate(AbstractIU iu, IUPayloadUpdate update)
{
Informer<Object> informer = getInformer(iu.getCategory());
try {
informer.send(update);
} catch (RSBException e) {
throw new RuntimeException(e);
}
}
public void sendIULinkUpdate(AbstractIU iu, IULinkUpdate update)
{
Informer<Object> informer = getInformer(iu.getCategory());
try {
informer.send(update);
} catch (RSBException e) {
throw new RuntimeException(e);
}
}
@Override
public AbstractIU getIU(String iuid)
{
return iuStore.get(iuid);
}
public void close()
{
server.deactivate();
for(Informer<?> informer: informerStore.values())
{
informer.deactivate();
}
}
}
language=proto
resolve.status=beta
resource.path=
rebuild.list=