Skip to content
Snippets Groups Projects
Commit f260d97a authored by Ramin Yaghoubzadeh's avatar Ramin Yaghoubzadeh
Browse files
Conflicts:
	cpp/build.xml
parents 0f3dae74 9ce99d0c
No related branches found
No related tags found
No related merge requests found
Showing
with 536 additions and 508 deletions
......@@ -13,4 +13,6 @@
.*.sw[a-z]
*.un~
Session.vim
**/manifest.mf
**/*.*~
<project name="ipaaca" default="build" basedir=".">
<project name="ipaaca-all" default="build" basedir=".">
<target name="resolve">
<subant target="resolve" genericantfile="build.xml">
<fileset dir="." includes="*/build.xml"/>
......
language=java
resolve.status=beta
resource.path=
publish.resolver=soa.core.repository
#resource.path=${shared.repository}/Humanoids;${shared.repository}/3dmodels;${shared.repository}/HMI/HmiElckerlyc/resources;${shared.repository}/logbackconfig;${shared.repository}/shaders;
run.jvmargs= -Xms128m -Xmx512m -Xss5M
rebuild.list=
<?xml version="1.0" encoding="UTF-8"?>
<project name="IpaacaJava" default="run">
<import file="../../soashared/ant/build.xml" />
<project name="ipaaca-java" default="run">
<target name="-pre-compilation">
<echo message="Compiling protobuf file" />
<mkdir dir="generatedsrc"/>
......@@ -11,4 +10,7 @@
<arg value="--java_out=generatedsrc/" />
</exec>
</target>
<!--import file="../../soashared/ant/build.xml" /-->
<import file="../../HmiBuild/build.xml" />
</project>
<ivy-module version="2.0">
<info organisation="ipaaca" module="ipaaca"/>
<info organisation="ipaaca" module="ipaaca-java"/>
<configurations>
<include file="${ivy.settings.dir}/configurations.xml"/>
</configurations>
......
Manifest-Version: 1.0
Main-Class: ipaaca.Info
Name: ipaaca
Specification-Title: ipaaca
Main-Class: main
Name: IpaacaJava
Specification-Title: IpaacaJava
Specification-Version: 0.1
Specification-Vendor: ipaaca
Implementation-Title: ipaaca
Implementation-Version: February 21 2012 05:23 PM
Implementation-Vendor: ipaaca
Implementation-Title: IpaacaJava
Implementation-Version: July 02 2012 01:46 PM
Implementation-Vendor: ipaaca
\ No newline at end of file
......@@ -7,82 +7,82 @@ import java.util.UUID;
/**
* Base class for InputBuffer and OutputBuffer.
*/
public abstract class Buffer {
private final String owningComponentName;
private List<IUEventHandler> eventHandlers = new ArrayList<IUEventHandler>();
protected final String uuid = UUID.randomUUID().toString().replaceAll("-","");
protected String uniqueName;
public String getUniqueName() {
return uniqueName;
}
public abstract class Buffer
{
private final String owningComponentName;
public String getOwningComponentName() {
return owningComponentName;
}
// def __init__(self, owning_component_name, participant_config=None):
// '''Create a Buffer.
//
// Keyword arguments:
// owning_compontent_name --
// participant_config -- RSB configuration
// '''
// super(Buffer, self).__init__()
// self._owning_component_name = owning_component_name
// self._participant_config = participant_config #rsb.ParticipantConfig.fromDefaultSources() if participant_config is None else participant_config
// self._uuid = str(uuid.uuid4())[0:8]
// # Initialise with a temporary, but already unique, name
// self._unique_name = "undef-"+self._uuid
// self._iu_store = IUStore()
// self._iu_event_handlers = []
/**
* @param owningComponentName name of the entity that owns this Buffer
* @param participantConfig RSB configuration
*/
public Buffer(String owningComponentName)
{
this.owningComponentName = owningComponentName;
uniqueName = "undef-"+uuid;
}
private List<IUEventHandler> eventHandlers = new ArrayList<IUEventHandler>();
protected final String uuid = UUID.randomUUID().toString().replaceAll("-", "");
protected String uniqueName;
// def register_handler(self, handler_function, for_event_types=None, for_categories=None):
// """Register a new IU event handler function.
//
// Keyword arguments:
// handler_function -- a function with the signature (IU, event_type, local)
// for_event_types -- a list of event types or None if handler should
// be called for all event types
// for_categories -- a list of category names or None if handler should
// be called for all categoires
//
// """
// handler = IUEventHandler(handler_function=handler_function, for_event_types=for_event_types, for_categories=for_categories)
// self._iu_event_handlers.append(handler)
public void registerHandler(IUEventHandler handler)
{
eventHandlers.add(handler);
}
// def call_iu_event_handlers(self, uid, local, event_type, category):
// """Call registered IU event handler functions registered for this event_type and category."""
// for h in self._iu_event_handlers:
// # print('calling an update handler for '+event_type+' -> '+str(h))
// h.call(self, uid, local=local, event_type=event_type, category=category)
/**
* Call registered IU event handler functions registered for this event_type and category.
*/
public void callIuEventHandlers(String uid, boolean local, IUEventType type, String category)
{
for(IUEventHandler h:eventHandlers)
{
h.call(this, uid, local, type, category);
}
}
public abstract AbstractIU getIU(String iuid);
public String getUniqueName()
{
return uniqueName;
}
public String getOwningComponentName()
{
return owningComponentName;
}
// def __init__(self, owning_component_name, participant_config=None):
// '''Create a Buffer.
//
// Keyword arguments:
// owning_compontent_name --
// participant_config -- RSB configuration
// '''
// super(Buffer, self).__init__()
// self._owning_component_name = owning_component_name
// self._participant_config = participant_config #rsb.ParticipantConfig.fromDefaultSources() if participant_config is None else participant_config
// self._uuid = str(uuid.uuid4())[0:8]
// # Initialise with a temporary, but already unique, name
// self._unique_name = "undef-"+self._uuid
// self._iu_store = IUStore()
// self._iu_event_handlers = []
/**
* @param owningComponentName name of the entity that owns this Buffer
* @param participantConfig RSB configuration
*/
public Buffer(String owningComponentName)
{
this.owningComponentName = owningComponentName;
uniqueName = "undef-" + uuid;
}
// def register_handler(self, handler_function, for_event_types=None, for_categories=None):
// """Register a new IU event handler function.
//
// Keyword arguments:
// handler_function -- a function with the signature (IU, event_type, local)
// for_event_types -- a list of event types or None if handler should
// be called for all event types
// for_categories -- a list of category names or None if handler should
// be called for all categoires
//
// """
// handler = IUEventHandler(handler_function=handler_function, for_event_types=for_event_types, for_categories=for_categories)
// self._iu_event_handlers.append(handler)
public void registerHandler(IUEventHandler handler)
{
eventHandlers.add(handler);
}
// def call_iu_event_handlers(self, uid, local, event_type, category):
// """Call registered IU event handler functions registered for this event_type and category."""
// for h in self._iu_event_handlers:
// # print('calling an update handler for '+event_type+' -> '+str(h))
// h.call(self, uid, local=local, event_type=event_type, category=category)
/**
* Call registered IU event handler functions registered for this event_type and category.
*/
public void callIuEventHandlers(String uid, boolean local, IUEventType type, String category)
{
for (IUEventHandler h : eventHandlers)
{
h.call(this, uid, local, type, category);
}
}
public abstract AbstractIU getIU(String iuid);
}
package ipaaca;
public interface HandlerFunctor {
void handle(AbstractIU iu, IUEventType type, boolean local);
public interface HandlerFunctor
{
void handle(AbstractIU iu, IUEventType type, boolean local);
}
package ipaaca;
public enum IUAccessMode {
PUSH,REMOTE,MESSAGE;
public enum IUAccessMode
{
PUSH, REMOTE, MESSAGE;
}
......@@ -3,19 +3,21 @@ package ipaaca;
/**
* Error indicating that an IU is immutable because it has been committed to.
* @author hvanwelbergen
*
*
*/
public class IUCommittedException extends RuntimeException{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU() {
return iu;
}
public IUCommittedException(AbstractIU iu)
{
super("Writing to IU " + iu.getUid() + " failed -- it has been committed to.");
this.iu = iu;
}
public class IUCommittedException extends RuntimeException
{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU()
{
return iu;
}
public IUCommittedException(AbstractIU iu)
{
super("Writing to IU " + iu.getUid() + " failed -- it has been committed to.");
this.iu = iu;
}
}
package ipaaca;
import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.LinkSet;
import ipaaca.protobuf.Ipaaca.PayloadItem;
......@@ -22,17 +23,17 @@ import rsb.converter.WireContents;
/**
* Serializes AbstractIUs into protocolbuffer IUs and vice versa.
* @author hvanwelbergen
*
*
*/
public class IUConverter implements Converter<ByteBuffer>
{
private final ConverterSignature signature;
public IUConverter(ConverterSignature signature)
{
this.signature = signature;
}
@Override
public ConverterSignature getSignature()
{
......@@ -42,36 +43,23 @@ public class IUConverter implements Converter<ByteBuffer>
@Override
public WireContents<ByteBuffer> serialize(Class<?> typeInfo, Object obj) throws ConversionException
{
AbstractIU iua = (AbstractIU)obj;
AbstractIU iua = (AbstractIU) obj;
List<PayloadItem> payloadItems = new ArrayList<PayloadItem>();
for(Entry<String, String> entry:iua.getPayload().entrySet())
for (Entry<String, String> entry : iua.getPayload().entrySet())
{
payloadItems.add(PayloadItem.newBuilder()
.setKey(entry.getKey())
.setValue(entry.getValue())
.setType("")
.build());
payloadItems.add(PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("").build());
}
List<LinkSet> links = new ArrayList<LinkSet>();
for (Entry<String, Collection<String>> entry:iua.getAllLinks().asMap().entrySet())
for (Entry<String, Collection<String>> entry : iua.getAllLinks().asMap().entrySet())
{
links.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build());
}
IU iu = IU.newBuilder()
.setUid(iua.getUid())
.setRevision(iua.getRevision())
.setCategory(iua.getCategory())
.setOwnerName(iua.getOwnerName())
.setCommitted(iua.isCommitted())
.setAccessMode(IU.AccessMode.PUSH) //TODO for other access modes (also in Python version)
.setReadOnly(iua.isReadOnly())
.setPayloadType("MAP")
.addAllPayload(payloadItems)
.addAllLinks(links)
.build();
return new WireContents<ByteBuffer>(ByteBuffer.wrap(iu.toByteArray()),"ipaaca-iu");
IU iu = IU.newBuilder().setUid(iua.getUid()).setRevision(iua.getRevision()).setCategory(iua.getCategory())
.setOwnerName(iua.getOwnerName()).setCommitted(iua.isCommitted()).setAccessMode(IU.AccessMode.PUSH) // TODO for other access modes (also in Python version)
.setReadOnly(iua.isReadOnly()).setPayloadType("MAP").addAllPayload(payloadItems).addAllLinks(links).build();
return new WireContents<ByteBuffer>(ByteBuffer.wrap(iu.toByteArray()), "ipaaca-iu");
}
@Override
......@@ -87,22 +75,22 @@ public class IUConverter implements Converter<ByteBuffer>
throw new RuntimeException(e);
}
if(iu.getAccessMode() == IU.AccessMode.PUSH)
if (iu.getAccessMode() == IU.AccessMode.PUSH)
{
RemotePushIU iuout = new RemotePushIU(iu.getUid());
iuout.setCategory(iu.getCategory());
iuout.committed = iu.getCommitted();
iuout.setOwnerName(iu.getOwnerName());
iuout.setRevision(iu.getRevision());
iuout.setReadOnly(iu.getReadOnly());
iuout.payload = new Payload(iuout,iu.getPayloadList());
iuout.setReadOnly(iu.getReadOnly());
iuout.payload = new Payload(iuout, iu.getPayloadList());
SetMultimap<String, String> links = HashMultimap.create();
for(LinkSet ls: iu.getLinksList())
for (LinkSet ls : iu.getLinksList())
{
links.putAll(ls.getType(),ls.getTargetsList());
links.putAll(ls.getType(), ls.getTargetsList());
}
iuout.setLinksLocally(links);
return new UserData<RemotePushIU>(iuout, RemotePushIU.class);
return new UserData<RemotePushIU>(iuout, RemotePushIU.class);
}
else
{
......
......@@ -7,76 +7,78 @@ import java.util.Set;
* Wrapper for IU event handling functions.
* @author hvanwelbergen
*/
public class IUEventHandler {
private final EnumSet<IUEventType> eventTypes;
private Set<String>categories;
private final HandlerFunctor handleFunctor;
// def __init__(self, handler_function, for_event_types=None, for_categories=None):
// """Create an IUEventHandler.
//
// Keyword arguments:
// handler_function -- the handler function with the signature
// (IU, event_type, local)
// for_event_types -- a list of event types or None if handler should
// be called for all event types
// for_categories -- a list of category names or None if handler should
// be called for all categoires
// """
// super(IUEventHandler, self).__init__()
// self._handler_function = handler_function
// self._for_event_types = (
// None if for_event_types is None else
// (for_event_types[:] if hasattr(for_event_types, '__iter__') else [for_event_types]))
// self._for_categories = (
// None if for_categories is None else
// (for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories]))
public IUEventHandler(HandlerFunctor func, EnumSet<IUEventType> eventTypes, Set<String>categories)
{
this.eventTypes = eventTypes;
this.categories = categories;
this.handleFunctor = func;
}
// def condition_met(self, event_type, category):
// """Check whether this IUEventHandler should be called.
//
// Keyword arguments:
// event_type -- type of the IU event
// category -- category of the IU which triggered the event
// """
// type_condition_met = (self._for_event_types is None or event_type in self._for_event_types)
// cat_condition_met = (self._for_categories is None or category in self._for_categories)
// return type_condition_met and cat_condition_met
/**
* Check whether this IUEventHandler should be called.
* @param type type of the IU event
* @param category category of the IU which triggered the event
*/
private boolean conditionMet(IUEventType type, String category)
{
return eventTypes.contains(type) && categories.contains(category);
}
// def call(self, buffer, iu_uid, local, event_type, category):
// """Call this IUEventHandler's function, if it applies.
//
// Keyword arguments:
// buffer -- the buffer in which the IU is stored
// iu_uid -- the uid of the IU
// local -- is the IU local or remote to this component? @RAMIN: Is this correct?
// event_type -- IU event type
// category -- category of the IU
// """
// if self.condition_met(event_type, category):
// iu = buffer._iu_store[iu_uid]
// self._handler_function(iu, event_type, local)
public void call(Buffer buf, String iuUid, boolean local, IUEventType type, String category)
{
if(conditionMet(type,category))
{
AbstractIU iu = buf.getIU(iuUid);
handleFunctor.handle(iu, type, local);
}
}
public class IUEventHandler
{
private final EnumSet<IUEventType> eventTypes;
private Set<String> categories;
private final HandlerFunctor handleFunctor;
// def __init__(self, handler_function, for_event_types=None, for_categories=None):
// """Create an IUEventHandler.
//
// Keyword arguments:
// handler_function -- the handler function with the signature
// (IU, event_type, local)
// for_event_types -- a list of event types or None if handler should
// be called for all event types
// for_categories -- a list of category names or None if handler should
// be called for all categoires
// """
// super(IUEventHandler, self).__init__()
// self._handler_function = handler_function
// self._for_event_types = (
// None if for_event_types is None else
// (for_event_types[:] if hasattr(for_event_types, '__iter__') else [for_event_types]))
// self._for_categories = (
// None if for_categories is None else
// (for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories]))
public IUEventHandler(HandlerFunctor func, EnumSet<IUEventType> eventTypes, Set<String> categories)
{
this.eventTypes = eventTypes;
this.categories = categories;
this.handleFunctor = func;
}
// def condition_met(self, event_type, category):
// """Check whether this IUEventHandler should be called.
//
// Keyword arguments:
// event_type -- type of the IU event
// category -- category of the IU which triggered the event
// """
// type_condition_met = (self._for_event_types is None or event_type in self._for_event_types)
// cat_condition_met = (self._for_categories is None or category in self._for_categories)
// return type_condition_met and cat_condition_met
/**
* Check whether this IUEventHandler should be called.
* @param type type of the IU event
* @param category category of the IU which triggered the event
*/
private boolean conditionMet(IUEventType type, String category)
{
return eventTypes.contains(type) && categories.contains(category);
}
// def call(self, buffer, iu_uid, local, event_type, category):
// """Call this IUEventHandler's function, if it applies.
//
// Keyword arguments:
// buffer -- the buffer in which the IU is stored
// iu_uid -- the uid of the IU
// local -- is the IU local or remote to this component? @RAMIN: Is this correct?
// event_type -- IU event type
// category -- category of the IU
// """
// if self.condition_met(event_type, category):
// iu = buffer._iu_store[iu_uid]
// self._handler_function(iu, event_type, local)
public void call(Buffer buf, String iuUid, boolean local, IUEventType type, String category)
{
if (conditionMet(type, category))
{
AbstractIU iu = buf.getIU(iuUid);
handleFunctor.handle(iu, type, local);
}
}
}
package ipaaca;
public enum IUEventType {
ADDED, COMMITTED, DELETED, RETRACTED, UPDATED,LINKSUPDATED;
public enum IUEventType
{
ADDED, COMMITTED, DELETED, RETRACTED, UPDATED, LINKSUPDATED;
}
package ipaaca;
public class IUPublishedException extends RuntimeException{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU() {
return iu;
}
public IUPublishedException(AbstractIU iu)
{
super("IU " + iu.getUid() + " is already present in the output buffer.");
this.iu = iu;
}
/**
* IUPublishedException exceptions occur when publishing (=putting it in an output buffer) an already published IU.
* @author hvanwelbergen
*
*/
public class IUPublishedException extends RuntimeException
{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU()
{
return iu;
}
public IUPublishedException(AbstractIU iu)
{
super("IU " + iu.getUid() + " is already present in the output buffer.");
this.iu = iu;
}
}
package ipaaca;
public class IUReadOnlyException extends RuntimeException{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU() {
return iu;
}
public IUReadOnlyException(AbstractIU iu)
{
super("Writing to IU " + iu.getUid() + " failed -- it is read-only.");
this.iu = iu;
}
/**
* IUReadOnlyException's occur when writing to a read-only IU
* @author hvanwelbergen
*
*/
public class IUReadOnlyException extends RuntimeException
{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU()
{
return iu;
}
public IUReadOnlyException(AbstractIU iu)
{
super("Writing to IU " + iu.getUid() + " failed -- it is read-only.");
this.iu = iu;
}
}
package ipaaca;
import java.util.HashMap;
import java.util.HashMap;
public class IUStore<X extends AbstractIU> extends HashMap<String,X>{
/**
* An IUStore maps an IUid to an IU
* @author hvanwelbergen
*
* @param <X> type of AbstractIU stored in the store
*/
public class IUStore<X extends AbstractIU> extends HashMap<String, X>
{
private static final long serialVersionUID = 1L;
}
package ipaaca;
public class IUUpdateFailedException extends RuntimeException{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU() {
return iu;
}
public IUUpdateFailedException(AbstractIU iu)
{
super("Remote update failed for IU " + iu.getUid() + ".");
this.iu = iu;
}
/**
* Indicates that a remote update failed
* @author hvanwelbergen
*
*/
public class IUUpdateFailedException extends RuntimeException
{
private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU()
{
return iu;
}
public IUUpdateFailedException(AbstractIU iu)
{
super("Remote update failed for IU " + iu.getUid() + ".");
this.iu = iu;
}
}
......@@ -5,35 +5,29 @@ import rsb.converter.ConverterSignature;
import rsb.converter.DefaultConverterRepository;
import rsb.converter.ProtocolBufferConverter;
public final class Initializer {
// def initialize_ipaaca_rsb():#{{{
// rsb.transport.converter.registerGlobalConverter(
// IntConverter(wireSchema="int32", dataType=int))
// rsb.transport.converter.registerGlobalConverter(
// IUConverter(wireSchema="ipaaca-iu", dataType=IU))
// rsb.transport.converter.registerGlobalConverter(
// IUPayloadUpdateConverter(
// wireSchema="ipaaca-iu-payload-update",
// dataType=IUPayloadUpdate))
// rsb.transport.converter.registerGlobalConverter(
// rsb.transport.converter.ProtocolBufferConverter(
// messageClass=iuProtoBuf_pb2.IUCommission))
// rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()
// #}}}
public static void initializeIpaacaRsb()
{
DefaultConverterRepository.getDefaultConverterRepository().addConverter(new IntConverter());
DefaultConverterRepository.getDefaultConverterRepository()
.addConverter(new ProtocolBufferConverter<IUCommission>(IUCommission.getDefaultInstance()));
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new IUConverter(new ConverterSignature("ipaaca-iu", RemotePushIU.class)));
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
/**
* Hooks up the ipaaca converters, call initializeIpaacaRsb() before using ipaaca.
* @author hvanwelbergen
*
*/
public final class Initializer
{
private Initializer()
{
}
public static void initializeIpaacaRsb()
{
DefaultConverterRepository.getDefaultConverterRepository().addConverter(new IntConverter());
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new ProtocolBufferConverter<IUCommission>(IUCommission.getDefaultInstance()));
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new IUConverter(new ConverterSignature("ipaaca-iu", RemotePushIU.class)));
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new IUConverter(new ConverterSignature("ipaaca-localiu", LocalIU.class)));
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new PayloadConverter());
DefaultConverterRepository.getDefaultConverterRepository().addConverter(
new LinkUpdateConverter());
}
DefaultConverterRepository.getDefaultConverterRepository().addConverter(new PayloadConverter());
DefaultConverterRepository.getDefaultConverterRepository().addConverter(new LinkUpdateConverter());
}
}
......@@ -25,68 +25,69 @@ import rsb.patterns.RemoteServer;
* An InputBuffer that holds remote IUs.
* @author hvanwelbergen
*/
public class InputBuffer extends Buffer{
private Map<String,RemoteServer> remoteServerStore = new HashMap<String,RemoteServer>();
private Map<String,Listener> listenerStore = new HashMap<String,Listener>();
private Set<String> categoryInterests = new HashSet<String>();
private final static Logger logger = LoggerFactory.getLogger(InputBuffer.class.getName());
private IUStore<RemotePushIU> iuStore = new IUStore<RemotePushIU>();
public void close()
{
for(Listener listener: listenerStore.values())
{
listener.deactivate();
}
for(RemoteServer remServer: remoteServerStore.values())
{
remServer.deactivate();
}
}
// def __init__(self, owning_component_name, category_interests=None, participant_config=None):
// '''Create an InputBuffer.
//
// Keyword arguments:
// owning_compontent_name -- name of the entity that owns this InputBuffer
// category_interests -- list of IU categories this Buffer is interested in
// participant_config = RSB configuration
// '''
// super(InputBuffer, self).__init__(owning_component_name, participant_config)
// self._unique_name = '/ipaaca/component/'+str(owning_component_name)+'ID'+self._uuid+'/IB'
// self._listener_store = {} # one per IU category
// self._remote_server_store = {} # one per remote-IU-owning Component
// self._category_interests = []
// if category_interests is not None:
// for cat in category_interests:
// self._create_category_listener_if_needed(cat)
public InputBuffer(String owningComponentName, Set<String>categoryInterests)
{
super(owningComponentName);
uniqueName = "/ipaaca/component/"+ owningComponentName +"ID"+uuid+"/IB";
for (String cat:categoryInterests)
{
createCategoryListenerIfNeeded(cat);
}
}
// def _get_remote_server(self, iu):
// '''Return (or create, store and return) a remote server.'''
// if iu.owner_name in self._remote_server_store:
// return self._remote_server_store[iu.owner_name]
// remote_server = rsb.createRemoteServer(rsb.Scope(str(iu.owner_name)))
// self._remote_server_store[iu.owner_name] = remote_server
// return remote_server
public RemoteServer getRemoteServer(AbstractIU iu)
{
if(remoteServerStore.containsKey(iu.getOwnerName()))
{
return remoteServerStore.get(iu.getOwnerName());
}
logger.debug("Getting remote server for {}",iu.getOwnerName());
RemoteServer remoteServer = Factory.getInstance().createRemoteServer(new Scope(iu.getOwnerName()));
try
public class InputBuffer extends Buffer
{
private Map<String, RemoteServer> remoteServerStore = new HashMap<String, RemoteServer>();
private Map<String, Listener> listenerStore = new HashMap<String, Listener>();
private Set<String> categoryInterests = new HashSet<String>();
private final static Logger logger = LoggerFactory.getLogger(InputBuffer.class.getName());
private IUStore<RemotePushIU> iuStore = new IUStore<RemotePushIU>();
public void close()
{
for (Listener listener : listenerStore.values())
{
listener.deactivate();
}
for (RemoteServer remServer : remoteServerStore.values())
{
remServer.deactivate();
}
}
// def __init__(self, owning_component_name, category_interests=None, participant_config=None):
// '''Create an InputBuffer.
//
// Keyword arguments:
// owning_compontent_name -- name of the entity that owns this InputBuffer
// category_interests -- list of IU categories this Buffer is interested in
// participant_config = RSB configuration
// '''
// super(InputBuffer, self).__init__(owning_component_name, participant_config)
// self._unique_name = '/ipaaca/component/'+str(owning_component_name)+'ID'+self._uuid+'/IB'
// self._listener_store = {} # one per IU category
// self._remote_server_store = {} # one per remote-IU-owning Component
// self._category_interests = []
// if category_interests is not None:
// for cat in category_interests:
// self._create_category_listener_if_needed(cat)
public InputBuffer(String owningComponentName, Set<String> categoryInterests)
{
super(owningComponentName);
uniqueName = "/ipaaca/component/" + owningComponentName + "ID" + uuid + "/IB";
for (String cat : categoryInterests)
{
createCategoryListenerIfNeeded(cat);
}
}
// def _get_remote_server(self, iu):
// '''Return (or create, store and return) a remote server.'''
// if iu.owner_name in self._remote_server_store:
// return self._remote_server_store[iu.owner_name]
// remote_server = rsb.createRemoteServer(rsb.Scope(str(iu.owner_name)))
// self._remote_server_store[iu.owner_name] = remote_server
// return remote_server
public RemoteServer getRemoteServer(AbstractIU iu)
{
if (remoteServerStore.containsKey(iu.getOwnerName()))
{
return remoteServerStore.get(iu.getOwnerName());
}
logger.debug("Getting remote server for {}", iu.getOwnerName());
RemoteServer remoteServer = Factory.getInstance().createRemoteServer(new Scope(iu.getOwnerName()));
try
{
remoteServer.activate();
}
......@@ -94,183 +95,186 @@ public class InputBuffer extends Buffer{
{
throw new RuntimeException(e);
}
remoteServerStore.put(iu.getOwnerName(), remoteServer);
return remoteServer;
}
// def _create_category_listener_if_needed(self, iu_category):
// '''Return (or create, store and return) a category listener.'''
// if iu_category in self._listener_store: return self._informer_store[iu_category]
// cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
// cat_listener.addHandler(self._handle_iu_events)
// self._listener_store[iu_category] = cat_listener
// self._category_interests.append(iu_category)
// logger.info("Added category listener for "+iu_category)
// return cat_listener
private Listener createCategoryListenerIfNeeded(String category)
{
if(listenerStore.containsKey(category))
{
return listenerStore.get(category);
}
Listener listener = Factory.getInstance().createListener(new Scope("/ipaaca/category/"+category));
listenerStore.put(category,listener);
categoryInterests.add(category);
listener.addHandler(new InputHandler(), true);
logger.info("Added category listener for {}",category);
try
remoteServerStore.put(iu.getOwnerName(), remoteServer);
return remoteServer;
}
// def _create_category_listener_if_needed(self, iu_category):
// '''Return (or create, store and return) a category listener.'''
// if iu_category in self._listener_store: return self._informer_store[iu_category]
// cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
// cat_listener.addHandler(self._handle_iu_events)
// self._listener_store[iu_category] = cat_listener
// self._category_interests.append(iu_category)
// logger.info("Added category listener for "+iu_category)
// return cat_listener
private Listener createCategoryListenerIfNeeded(String category)
{
if (listenerStore.containsKey(category))
{
return listenerStore.get(category);
}
Listener listener = Factory.getInstance().createListener(new Scope("/ipaaca/category/" + category));
listenerStore.put(category, listener);
categoryInterests.add(category);
listener.addHandler(new InputHandler(), true);
logger.info("Added category listener for {}", category);
try
{
listener.activate();
}
catch (InitializeException e)
{
throw new RuntimeException(e);
}
return listener;
}
class InputHandler implements Handler
{
}
return listener;
}
@Override
public void internalNotify(Event ev) {
handleIUEvents(ev);
}
}
// def _handle_iu_events(self, event):
// '''Dispatch incoming IU events.
//
// Adds incoming IU's to the store, applies payload and commit updates to
// IU, calls IU event handlers.'
//
// Keyword arguments:
// event -- a converted RSB event
// '''
// if type(event.data) is RemotePushIU:
// # a new IU
// if event.data.uid in self._iu_store:
// # already in our store
// pass
// else:
// self._iu_store[ event.data.uid ] = event.data
// event.data.buffer = self
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.ADDED, category=event.data.category)
// else:
// # an update to an existing IU
// if event.data.writer_name == self.unique_name:
// # Discard updates that originate from this buffer
// return
// if event.data.uid not in self._iu_store:
// # TODO: we should request the IU's owner to send us the IU
// logger.warning("Update message for IU which we did not fully receive before.")
// return
// if type(event.data) is iuProtoBuf_pb2.IUCommission:
// # IU commit
// iu = self._iu_store[event.data.uid]
// iu._apply_commission()
// iu._revision = event.data.revision
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category)
// elif type(event.data) is IUPayloadUpdate:
// # IU payload update
// iu = self._iu_store[event.data.uid]
// iu._apply_update(event.data)
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category)
/**
* Dispatch incoming IU events.
*/
private void handleIUEvents(Event event)
{
if(event.getData() instanceof RemotePushIU)
{
RemotePushIU rp = (RemotePushIU)event.getData();
//a new IU
if(iuStore.containsKey(rp.getUid()))
{
// already in our store
return;
}
else
{
iuStore.put(rp.getUid(), rp);
rp.setBuffer(this);
this.callIuEventHandlers(rp.getUid(), false, IUEventType.ADDED, rp.getCategory());
}
}
else
{
if (event.getData() instanceof IULinkUpdate)
class InputHandler implements Handler
{
@Override
public void internalNotify(Event ev)
{
handleIUEvents(ev);
}
}
// def _handle_iu_events(self, event):
// '''Dispatch incoming IU events.
//
// Adds incoming IU's to the store, applies payload and commit updates to
// IU, calls IU event handlers.'
//
// Keyword arguments:
// event -- a converted RSB event
// '''
// if type(event.data) is RemotePushIU:
// # a new IU
// if event.data.uid in self._iu_store:
// # already in our store
// pass
// else:
// self._iu_store[ event.data.uid ] = event.data
// event.data.buffer = self
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.ADDED, category=event.data.category)
// else:
// # an update to an existing IU
// if event.data.writer_name == self.unique_name:
// # Discard updates that originate from this buffer
// return
// if event.data.uid not in self._iu_store:
// # TODO: we should request the IU's owner to send us the IU
// logger.warning("Update message for IU which we did not fully receive before.")
// return
// if type(event.data) is iuProtoBuf_pb2.IUCommission:
// # IU commit
// iu = self._iu_store[event.data.uid]
// iu._apply_commission()
// iu._revision = event.data.revision
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category)
// elif type(event.data) is IUPayloadUpdate:
// # IU payload update
// iu = self._iu_store[event.data.uid]
// iu._apply_update(event.data)
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category)
/**
* Dispatch incoming IU events.
*/
private void handleIUEvents(Event event)
{
if (event.getData() instanceof RemotePushIU)
{
RemotePushIU rp = (RemotePushIU) event.getData();
// a new IU
if (iuStore.containsKey(rp.getUid()))
{
// already in our store
return;
}
else
{
iuStore.put(rp.getUid(), rp);
rp.setBuffer(this);
this.callIuEventHandlers(rp.getUid(), false, IUEventType.ADDED, rp.getCategory());
}
}
else
{
if (event.getData() instanceof IULinkUpdate)
{
IULinkUpdate iuLinkUpdate = (IULinkUpdate)event.getData();
if(iuLinkUpdate.getWriterName().equals(this.getUniqueName()))
IULinkUpdate iuLinkUpdate = (IULinkUpdate) event.getData();
if (iuLinkUpdate.getWriterName().equals(this.getUniqueName()))
{
//Discard updates that originate from this buffer
// Discard updates that originate from this buffer
return;
}
if(!iuStore.containsKey(iuLinkUpdate.getUid()))
if (!iuStore.containsKey(iuLinkUpdate.getUid()))
{
logger.warn("Link update message for IU which we did not fully receive before.");
return;
}
RemotePushIU iu = this.iuStore.get(iuLinkUpdate.getUid());
RemotePushIU iu = this.iuStore.get(iuLinkUpdate.getUid());
iu.applyLinkUpdate(iuLinkUpdate);
callIuEventHandlers(iu.getUid(), false, IUEventType.LINKSUPDATED, iu.category);
}
if (event.getData() instanceof IUPayloadUpdate)
{
IUPayloadUpdate iuUpdate = (IUPayloadUpdate)event.getData();
logger.debug("handleIUEvents invoked with an IUPayloadUpdate: {}", iuUpdate);
if(iuUpdate.getWriterName().equals(this.getUniqueName()))
{
//Discard updates that originate from this buffer
return;
}
if(!iuStore.containsKey(iuUpdate.getUid()))
{
logger.warn("Update message for IU which we did not fully receive before.");
return;
}
RemotePushIU iu = this.iuStore.get(iuUpdate.getUid());
iu.applyUpdate(iuUpdate);
callIuEventHandlers(iu.getUid(), false, IUEventType.UPDATED, iu.category);
}
if (event.getData() instanceof IUCommission)
{
IUCommission iuc = (IUCommission)event.getData();
logger.debug("handleIUEvents invoked with an IUCommission: {}", iuc);
logger.debug("{}, {}",iuc.getWriterName(), this.getUniqueName());
if(iuc.getWriterName().equals(this.getUniqueName()))
{
//Discard updates that originate from this buffer
return;
}
if(!iuStore.containsKey(iuc.getUid()))
{
logger.warn("Update message for IU which we did not fully receive before.");
return;
}
RemotePushIU iu = this.iuStore.get(iuc.getUid());
iu.applyCommmision();
iu.setRevision(iuc.getRevision());
callIuEventHandlers(iuc.getUid(), false, IUEventType.COMMITTED, iu.getCategory());
}
}
}
public InputBuffer(String owningComponentName) {
super(owningComponentName);
}
if (event.getData() instanceof IUPayloadUpdate)
{
IUPayloadUpdate iuUpdate = (IUPayloadUpdate) event.getData();
logger.debug("handleIUEvents invoked with an IUPayloadUpdate: {}", iuUpdate);
if (iuUpdate.getWriterName().equals(this.getUniqueName()))
{
// Discard updates that originate from this buffer
return;
}
if (!iuStore.containsKey(iuUpdate.getUid()))
{
logger.warn("Update message for IU which we did not fully receive before.");
return;
}
RemotePushIU iu = this.iuStore.get(iuUpdate.getUid());
iu.applyUpdate(iuUpdate);
callIuEventHandlers(iu.getUid(), false, IUEventType.UPDATED, iu.category);
}
if (event.getData() instanceof IUCommission)
{
IUCommission iuc = (IUCommission) event.getData();
logger.debug("handleIUEvents invoked with an IUCommission: {}", iuc);
logger.debug("{}, {}", iuc.getWriterName(), this.getUniqueName());
if (iuc.getWriterName().equals(this.getUniqueName()))
{
// Discard updates that originate from this buffer
return;
}
if (!iuStore.containsKey(iuc.getUid()))
{
logger.warn("Update message for IU which we did not fully receive before.");
return;
}
RemotePushIU iu = this.iuStore.get(iuc.getUid());
iu.applyCommmision();
iu.setRevision(iuc.getRevision());
callIuEventHandlers(iuc.getUid(), false, IUEventType.COMMITTED, iu.getCategory());
}
}
}
public InputBuffer(String owningComponentName)
{
super(owningComponentName);
}
@Override
public AbstractIU getIU(String iuid)
{
return iuStore.get(iuid);
}
public Collection<RemotePushIU> getIUs()
{
return iuStore.values();
}
public Collection<RemotePushIU> getIUs()
{
return iuStore.values();
}
}
......@@ -11,25 +11,27 @@ import rsb.converter.WireContents;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* Serializer/deserializer for ints
* @author hvanwelbergen
*
*/
public class IntConverter implements Converter<ByteBuffer>
{
@Override
public ConverterSignature getSignature()
{
return new ConverterSignature("int32",Integer.class);
return new ConverterSignature("int32", Integer.class);
}
@Override
public WireContents<ByteBuffer> serialize(Class<?> typeInfo, Object obj) throws ConversionException
{
Integer intVal = (Integer)obj;
IntMessage message = IntMessage.newBuilder()
.setValue(intVal)
.build();
return new WireContents<ByteBuffer>(ByteBuffer.wrap(message.toByteArray()),"int32");
Integer intVal = (Integer) obj;
IntMessage message = IntMessage.newBuilder().setValue(intVal).build();
return new WireContents<ByteBuffer>(ByteBuffer.wrap(message.toByteArray()), "int32");
}
@Override
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment