Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • scs/ipaaca
  • ramin.yaghoubzadeh/ipaaca
2 results
Select Git revision
Show changes
package ipaaca;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import com.google.protobuf.InvalidProtocolBufferException;
import rsb.converter.ConversionException;
import rsb.converter.Converter;
import rsb.converter.ConverterSignature;
import rsb.converter.UserData;
import rsb.converter.WireContents;
import ipaaca.Ipaaca.IU;
import ipaaca.Ipaaca.PayloadItem;
/**
* Serializes AbstractIUs into protocolbuffer IUs and vice versa.
* @author hvanwelbergen
*
*/
public class IUConverter implements Converter<ByteBuffer>
{
private final ConverterSignature signature;
public IUConverter(ConverterSignature signature)
{
this.signature = signature;
}
@Override
public ConverterSignature getSignature()
{
return signature;
}
@Override
public WireContents<ByteBuffer> serialize(Class<?> typeInfo, Object obj) throws ConversionException
{
AbstractIU iua = (AbstractIU)obj;
List<PayloadItem> payloadItems = new ArrayList<PayloadItem>();
for(Entry<String, String> entry:iua.getPayload().entrySet())
{
payloadItems.add(PayloadItem.newBuilder()
.setKey(entry.getKey())
.setValue(entry.getValue())
.setType("")
.build());
}
IU iu = IU.newBuilder()
.setUid(iua.getUid())
.setRevision(iua.getRevision())
.setCategory(iua.getCategory())
.setType("")
.setOwnerName(iua.getOwnerName())
.setCommitted(iua.isCommitted())
.setAccessMode(IU.AccessMode.PUSH) //TODO for other access modes (also in Python version)
.setReadOnly(iua.isReadOnly())
.addAllPayload(payloadItems)
.build();
return new WireContents<ByteBuffer>(ByteBuffer.wrap(iu.toByteArray()),"ipaaca-remotepushiu");
}
@Override
public UserData<?> deserialize(String wireSchema, ByteBuffer buffer) throws ConversionException
{
IU iu;
try
{
iu = IU.newBuilder().mergeFrom(buffer.array()).build();
}
catch (InvalidProtocolBufferException e)
{
throw new RuntimeException(e);
}
if(iu.getAccessMode() == IU.AccessMode.PUSH)
{
RemotePushIU iuout = new RemotePushIU(iu.getUid());
iuout.setCategory(iu.getCategory());
iuout.committed = iu.getCommitted();
iuout.setOwnerName(iu.getOwnerName());
iuout.setRevision(iu.getRevision());
iuout.setReadOnly(iu.getReadOnly());
iuout.payload = new Payload(iuout,iu.getPayloadList());
return new UserData<RemotePushIU>(iuout, RemotePushIU.class);
}
else
{
throw new RuntimeException("We can only handle IUs with access mode 'PUSH' for now!");
}
}
}
package ipaaca;
import java.util.EnumSet;
import java.util.Set;
/**
* Wrapper for IU event handling functions.
* @author hvanwelbergen
*/
public class IUEventHandler {
private final EnumSet<IUEventType> eventTypes;
private Set<String>categories;
private final HandlerFunctor handleFunctor;
// def __init__(self, handler_function, for_event_types=None, for_categories=None):
// """Create an IUEventHandler.
//
// Keyword arguments:
// handler_function -- the handler function with the signature
// (IU, event_type, local)
// for_event_types -- a list of event types or None if handler should
// be called for all event types
// for_categories -- a list of category names or None if handler should
// be called for all categoires
// """
// super(IUEventHandler, self).__init__()
// self._handler_function = handler_function
// self._for_event_types = (
// None if for_event_types is None else
// (for_event_types[:] if hasattr(for_event_types, '__iter__') else [for_event_types]))
// self._for_categories = (
// None if for_categories is None else
// (for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories]))
public IUEventHandler(HandlerFunctor func, EnumSet<IUEventType> eventTypes, Set<String>categories)
{
this.eventTypes = eventTypes;
this.categories = categories;
this.handleFunctor = func;
}
// def condition_met(self, event_type, category):
// """Check whether this IUEventHandler should be called.
//
// Keyword arguments:
// event_type -- type of the IU event
// category -- category of the IU which triggered the event
// """
// type_condition_met = (self._for_event_types is None or event_type in self._for_event_types)
// cat_condition_met = (self._for_categories is None or category in self._for_categories)
// return type_condition_met and cat_condition_met
/**
* Check whether this IUEventHandler should be called.
* @param type type of the IU event
* @param category category of the IU which triggered the event
*/
private boolean conditionMet(IUEventType type, String category)
{
return eventTypes.contains(type) && categories.contains(category);
}
// def call(self, buffer, iu_uid, local, event_type, category):
// """Call this IUEventHandler's function, if it applies.
//
// Keyword arguments:
// buffer -- the buffer in which the IU is stored
// iu_uid -- the uid of the IU
// local -- is the IU local or remote to this component? @RAMIN: Is this correct?
// event_type -- IU event type
// category -- category of the IU
// """
// if self.condition_met(event_type, category):
// iu = buffer._iu_store[iu_uid]
// self._handler_function(iu, event_type, local)
public void call(Buffer buf, String iuUid, boolean local, IUEventType type, String category)
{
if(conditionMet(type,category))
{
AbstractIU iu = buf.getIU(iuUid);
handleFunctor.handle(iu, type, local);
}
}
}
package ipaaca;
public enum IUEventType {
ADDED, COMMITTED, DELETED, RETRACTED, UPDATED;
}
package ipaaca;
public class IUPublishedException extends RuntimeException{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU() {
return iu;
}
public IUPublishedException(AbstractIU iu)
{
super("IU " + iu.getUid() + " is already present in the output buffer.");
this.iu = iu;
}
}
package ipaaca;
public class IUReadOnlyException extends RuntimeException{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU() {
return iu;
}
public IUReadOnlyException(AbstractIU iu)
{
super("Writing to IU " + iu.getUid() + " failed -- it is read-only.");
this.iu = iu;
}
}
package ipaaca;
import java.util.HashMap;
public class IUStore<X extends AbstractIU> extends HashMap<String,X>{
private static final long serialVersionUID = 1L;
}
package ipaaca;
public class IUUpdateFailedException extends RuntimeException{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU() {
return iu;
}
public IUUpdateFailedException(AbstractIU iu)
{
super("Remote update failed for IU " + iu.getUid() + ".");
this.iu = iu;
}
}
package ipaaca;
import ipaaca.Ipaaca.IUCommission;
import ipaaca.Ipaaca.IUPayloadUpdate;
import rsb.converter.ConverterSignature;
import rsb.converter.DefaultConverterRepository;
import rsb.converter.ProtocolBufferConverter;
public final class Initializer {
// def initialize_ipaaca_rsb():#{{{
// rsb.transport.converter.registerGlobalConverter(
// IntConverter(wireSchema="int32", dataType=int))
// rsb.transport.converter.registerGlobalConverter(
// IUConverter(wireSchema="ipaaca-iu", dataType=IU))
// rsb.transport.converter.registerGlobalConverter(
// IUPayloadUpdateConverter(
// wireSchema="ipaaca-iu-payload-update",
// dataType=IUPayloadUpdate))
// rsb.transport.converter.registerGlobalConverter(
// rsb.transport.converter.ProtocolBufferConverter(
// messageClass=iuProtoBuf_pb2.IUCommission))
// rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()
// #}}}
public static void initializeIpaacaRsb()
{
DefaultConverterRepository.getDefaultConverterRepository().addConverter(new IntConverter());
DefaultConverterRepository.getDefaultConverterRepository()
.addConverter(new ProtocolBufferConverter<IUCommission>(IUCommission.getDefaultInstance()));
DefaultConverterRepository.getDefaultConverterRepository()
.addConverter(new ProtocolBufferConverter<IUPayloadUpdate>(IUPayloadUpdate.getDefaultInstance()));
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new IUConverter(new ConverterSignature("ipaaca-remotepushiu", RemotePushIU.class)));
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new IUConverter(new ConverterSignature("ipaaca-localiu", LocalIU.class)));
}
}
package ipaaca;
import ipaaca.Ipaaca.IUCommission;
import ipaaca.Ipaaca.IUPayloadUpdate;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rsb.Event;
import rsb.Factory;
import rsb.Handler;
import rsb.InitializeException;
import rsb.Listener;
import rsb.Scope;
import rsb.patterns.RemoteServer;
/**
* An InputBuffer that holds remote IUs.
* @author hvanwelbergen
*/
public class InputBuffer extends Buffer{
private Map<String,RemoteServer> remoteServerStore = new HashMap<String,RemoteServer>();
private Map<String,Listener> listenerStore = new HashMap<String,Listener>();
private Set<String> categoryInterests = new HashSet<String>();
private final static Logger logger = LoggerFactory.getLogger(InputBuffer.class.getName());
private IUStore<RemotePushIU> iuStore = new IUStore<RemotePushIU>();
public void close()
{
for(Listener listener: listenerStore.values())
{
listener.deactivate();
}
for(RemoteServer remServer: remoteServerStore.values())
{
remServer.deactivate();
}
}
// def __init__(self, owning_component_name, category_interests=None, participant_config=None):
// '''Create an InputBuffer.
//
// Keyword arguments:
// owning_compontent_name -- name of the entity that owns this InputBuffer
// category_interests -- list of IU categories this Buffer is interested in
// participant_config = RSB configuration
// '''
// super(InputBuffer, self).__init__(owning_component_name, participant_config)
// self._unique_name = '/ipaaca/component/'+str(owning_component_name)+'ID'+self._uuid+'/IB'
// self._listener_store = {} # one per IU category
// self._remote_server_store = {} # one per remote-IU-owning Component
// self._category_interests = []
// if category_interests is not None:
// for cat in category_interests:
// self._create_category_listener_if_needed(cat)
public InputBuffer(String owningComponentName, Set<String>categoryInterests)
{
super(owningComponentName);
uniqueName = "/ipaaca/component/"+ owningComponentName +"ID"+uuid+"/IB";
for (String cat:categoryInterests)
{
createCategoryListenerIfNeeded(cat);
}
}
// def _get_remote_server(self, iu):
// '''Return (or create, store and return) a remote server.'''
// if iu.owner_name in self._remote_server_store:
// return self._remote_server_store[iu.owner_name]
// remote_server = rsb.createRemoteServer(rsb.Scope(str(iu.owner_name)))
// self._remote_server_store[iu.owner_name] = remote_server
// return remote_server
public RemoteServer getRemoteServer(AbstractIU iu)
{
if(remoteServerStore.containsKey(iu.getOwnerName()))
{
return remoteServerStore.get(iu.getOwnerName());
}
logger.debug("Getting remote server for {}",iu.getOwnerName());
RemoteServer remoteServer = Factory.getInstance().createRemoteServer(new Scope(iu.getOwnerName()));
try
{
remoteServer.activate();
}
catch (InitializeException e)
{
throw new RuntimeException(e);
}
remoteServerStore.put(iu.getOwnerName(), remoteServer);
return remoteServer;
}
// def _create_category_listener_if_needed(self, iu_category):
// '''Return (or create, store and return) a category listener.'''
// if iu_category in self._listener_store: return self._informer_store[iu_category]
// cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
// cat_listener.addHandler(self._handle_iu_events)
// self._listener_store[iu_category] = cat_listener
// self._category_interests.append(iu_category)
// logger.info("Added category listener for "+iu_category)
// return cat_listener
private Listener createCategoryListenerIfNeeded(String category)
{
if(listenerStore.containsKey(category))
{
return listenerStore.get(category);
}
Listener listener = Factory.getInstance().createListener(new Scope("/ipaaca/category/"+category));
listenerStore.put(category,listener);
categoryInterests.add(category);
listener.addHandler(new InputHandler(), true);
logger.info("Added category listener for {}",category);
try
{
listener.activate();
}
catch (InitializeException e)
{
throw new RuntimeException(e);
}
return listener;
}
class InputHandler implements Handler
{
@Override
public void internalNotify(Event ev) {
handleIUEvents(ev);
}
}
// def _handle_iu_events(self, event):
// '''Dispatch incoming IU events.
//
// Adds incoming IU's to the store, applies payload and commit updates to
// IU, calls IU event handlers.'
//
// Keyword arguments:
// event -- a converted RSB event
// '''
// if type(event.data) is RemotePushIU:
// # a new IU
// if event.data.uid in self._iu_store:
// # already in our store
// pass
// else:
// self._iu_store[ event.data.uid ] = event.data
// event.data.buffer = self
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.ADDED, category=event.data.category)
// else:
// # an update to an existing IU
// if event.data.writer_name == self.unique_name:
// # Discard updates that originate from this buffer
// return
// if event.data.uid not in self._iu_store:
// # TODO: we should request the IU's owner to send us the IU
// logger.warning("Update message for IU which we did not fully receive before.")
// return
// if type(event.data) is iuProtoBuf_pb2.IUCommission:
// # IU commit
// iu = self._iu_store[event.data.uid]
// iu._apply_commission()
// iu._revision = event.data.revision
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category)
// elif type(event.data) is IUPayloadUpdate:
// # IU payload update
// iu = self._iu_store[event.data.uid]
// iu._apply_update(event.data)
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category)
/**
* Dispatch incoming IU events.
*/
private void handleIUEvents(Event event)
{
if(event.getData() instanceof RemotePushIU)
{
RemotePushIU rp = (RemotePushIU)event.getData();
//a new IU
if(iuStore.containsKey(rp.getUid()))
{
// already in our store
return;
}
else
{
iuStore.put(rp.getUid(), rp);
rp.setBuffer(this);
this.callIuEventHandlers(rp.getUid(), false, IUEventType.ADDED, rp.getCategory());
}
}
else
{
if (event.getData() instanceof IUPayloadUpdate)
{
IUPayloadUpdate iuUpdate = (IUPayloadUpdate)event.getData();
logger.debug("handleIUEvents invoked with an IUPayloadUpdate: {}", iuUpdate);
if(iuUpdate.getWriterName().equals(this.getUniqueName()))
{
//Discard updates that originate from this buffer
return;
}
if(!iuStore.containsKey(iuUpdate.getUid()))
{
logger.warn("Update message for IU which we did not fully receive before.");
return;
}
RemotePushIU iu = this.iuStore.get(iuUpdate.getUid());
iu.applyUpdate(iuUpdate);
callIuEventHandlers(iu.getUid(), false, IUEventType.UPDATED, iu.category);
}
if (event.getData() instanceof IUCommission)
{
IUCommission iuc = (IUCommission)event.getData();
logger.debug("handleIUEvents invoked with an IUCommission: {}", iuc);
logger.debug("{}, {}",iuc.getWriterName(), this.getUniqueName());
if(iuc.getWriterName().equals(this.getUniqueName()))
{
//Discard updates that originate from this buffer
return;
}
if(!iuStore.containsKey(iuc.getUid()))
{
logger.warn("Update message for IU which we did not fully receive before.");
return;
}
RemotePushIU iu = this.iuStore.get(iuc.getUid());
iu.applyCommmision();
iu.setRevision(iuc.getRevision());
callIuEventHandlers(iuc.getUid(), false, IUEventType.COMMITTED, iu.getCategory());
}
}
}
public InputBuffer(String owningComponentName) {
super(owningComponentName);
}
@Override
public AbstractIU getIU(String iuid)
{
return iuStore.get(iuid);
}
}