Skip to content
Snippets Groups Projects
Commit 4553dd38 authored by hvanwelbergen's avatar hvanwelbergen
Browse files

Fixes some checkstyle formatting issues

parent f0da184e
No related branches found
No related tags found
No related merge requests found
Showing
with 470 additions and 476 deletions
...@@ -7,82 +7,82 @@ import java.util.UUID; ...@@ -7,82 +7,82 @@ import java.util.UUID;
/** /**
* Base class for InputBuffer and OutputBuffer. * Base class for InputBuffer and OutputBuffer.
*/ */
public abstract class Buffer { public abstract class Buffer
private final String owningComponentName; {
private final String owningComponentName;
private List<IUEventHandler> eventHandlers = new ArrayList<IUEventHandler>();
protected final String uuid = UUID.randomUUID().toString().replaceAll("-","");
protected String uniqueName;
public String getUniqueName() {
return uniqueName;
}
public String getOwningComponentName() { private List<IUEventHandler> eventHandlers = new ArrayList<IUEventHandler>();
return owningComponentName; protected final String uuid = UUID.randomUUID().toString().replaceAll("-", "");
} protected String uniqueName;
// def __init__(self, owning_component_name, participant_config=None):
// '''Create a Buffer.
//
// Keyword arguments:
// owning_compontent_name --
// participant_config -- RSB configuration
// '''
// super(Buffer, self).__init__()
// self._owning_component_name = owning_component_name
// self._participant_config = participant_config #rsb.ParticipantConfig.fromDefaultSources() if participant_config is None else participant_config
// self._uuid = str(uuid.uuid4())[0:8]
// # Initialise with a temporary, but already unique, name
// self._unique_name = "undef-"+self._uuid
// self._iu_store = IUStore()
// self._iu_event_handlers = []
/**
* @param owningComponentName name of the entity that owns this Buffer
* @param participantConfig RSB configuration
*/
public Buffer(String owningComponentName)
{
this.owningComponentName = owningComponentName;
uniqueName = "undef-"+uuid;
}
public String getUniqueName()
{
// def register_handler(self, handler_function, for_event_types=None, for_categories=None): return uniqueName;
// """Register a new IU event handler function. }
//
// Keyword arguments: public String getOwningComponentName()
// handler_function -- a function with the signature (IU, event_type, local) {
// for_event_types -- a list of event types or None if handler should return owningComponentName;
// be called for all event types }
// for_categories -- a list of category names or None if handler should
// be called for all categoires // def __init__(self, owning_component_name, participant_config=None):
// // '''Create a Buffer.
// """ //
// handler = IUEventHandler(handler_function=handler_function, for_event_types=for_event_types, for_categories=for_categories) // Keyword arguments:
// self._iu_event_handlers.append(handler) // owning_compontent_name --
public void registerHandler(IUEventHandler handler) // participant_config -- RSB configuration
{ // '''
eventHandlers.add(handler); // super(Buffer, self).__init__()
} // self._owning_component_name = owning_component_name
// self._participant_config = participant_config #rsb.ParticipantConfig.fromDefaultSources() if participant_config is None else participant_config
// def call_iu_event_handlers(self, uid, local, event_type, category): // self._uuid = str(uuid.uuid4())[0:8]
// """Call registered IU event handler functions registered for this event_type and category.""" // # Initialise with a temporary, but already unique, name
// for h in self._iu_event_handlers: // self._unique_name = "undef-"+self._uuid
// # print('calling an update handler for '+event_type+' -> '+str(h)) // self._iu_store = IUStore()
// h.call(self, uid, local=local, event_type=event_type, category=category) // self._iu_event_handlers = []
/** /**
* Call registered IU event handler functions registered for this event_type and category. * @param owningComponentName name of the entity that owns this Buffer
*/ * @param participantConfig RSB configuration
public void callIuEventHandlers(String uid, boolean local, IUEventType type, String category) */
{ public Buffer(String owningComponentName)
for(IUEventHandler h:eventHandlers) {
{ this.owningComponentName = owningComponentName;
h.call(this, uid, local, type, category); uniqueName = "undef-" + uuid;
} }
}
// def register_handler(self, handler_function, for_event_types=None, for_categories=None):
public abstract AbstractIU getIU(String iuid); // """Register a new IU event handler function.
//
// Keyword arguments:
// handler_function -- a function with the signature (IU, event_type, local)
// for_event_types -- a list of event types or None if handler should
// be called for all event types
// for_categories -- a list of category names or None if handler should
// be called for all categoires
//
// """
// handler = IUEventHandler(handler_function=handler_function, for_event_types=for_event_types, for_categories=for_categories)
// self._iu_event_handlers.append(handler)
public void registerHandler(IUEventHandler handler)
{
eventHandlers.add(handler);
}
// def call_iu_event_handlers(self, uid, local, event_type, category):
// """Call registered IU event handler functions registered for this event_type and category."""
// for h in self._iu_event_handlers:
// # print('calling an update handler for '+event_type+' -> '+str(h))
// h.call(self, uid, local=local, event_type=event_type, category=category)
/**
* Call registered IU event handler functions registered for this event_type and category.
*/
public void callIuEventHandlers(String uid, boolean local, IUEventType type, String category)
{
for (IUEventHandler h : eventHandlers)
{
h.call(this, uid, local, type, category);
}
}
public abstract AbstractIU getIU(String iuid);
} }
package ipaaca; package ipaaca;
public interface HandlerFunctor { public interface HandlerFunctor
void handle(AbstractIU iu, IUEventType type, boolean local); {
void handle(AbstractIU iu, IUEventType type, boolean local);
} }
package ipaaca; package ipaaca;
public enum IUAccessMode { public enum IUAccessMode
PUSH,REMOTE,MESSAGE; {
PUSH, REMOTE, MESSAGE;
} }
...@@ -3,19 +3,21 @@ package ipaaca; ...@@ -3,19 +3,21 @@ package ipaaca;
/** /**
* Error indicating that an IU is immutable because it has been committed to. * Error indicating that an IU is immutable because it has been committed to.
* @author hvanwelbergen * @author hvanwelbergen
* *
*/ */
public class IUCommittedException extends RuntimeException{ public class IUCommittedException extends RuntimeException
private static final long serialVersionUID = 1L; {
private final AbstractIU iu; private static final long serialVersionUID = 1L;
private final AbstractIU iu;
public AbstractIU getIU() {
return iu; public AbstractIU getIU()
} {
return iu;
public IUCommittedException(AbstractIU iu) }
{
super("Writing to IU " + iu.getUid() + " failed -- it has been committed to."); public IUCommittedException(AbstractIU iu)
this.iu = iu; {
} super("Writing to IU " + iu.getUid() + " failed -- it has been committed to.");
this.iu = iu;
}
} }
package ipaaca; package ipaaca;
import ipaaca.protobuf.Ipaaca.IU; import ipaaca.protobuf.Ipaaca.IU;
import ipaaca.protobuf.Ipaaca.LinkSet; import ipaaca.protobuf.Ipaaca.LinkSet;
import ipaaca.protobuf.Ipaaca.PayloadItem; import ipaaca.protobuf.Ipaaca.PayloadItem;
...@@ -22,17 +23,17 @@ import rsb.converter.WireContents; ...@@ -22,17 +23,17 @@ import rsb.converter.WireContents;
/** /**
* Serializes AbstractIUs into protocolbuffer IUs and vice versa. * Serializes AbstractIUs into protocolbuffer IUs and vice versa.
* @author hvanwelbergen * @author hvanwelbergen
* *
*/ */
public class IUConverter implements Converter<ByteBuffer> public class IUConverter implements Converter<ByteBuffer>
{ {
private final ConverterSignature signature; private final ConverterSignature signature;
public IUConverter(ConverterSignature signature) public IUConverter(ConverterSignature signature)
{ {
this.signature = signature; this.signature = signature;
} }
@Override @Override
public ConverterSignature getSignature() public ConverterSignature getSignature()
{ {
...@@ -42,36 +43,23 @@ public class IUConverter implements Converter<ByteBuffer> ...@@ -42,36 +43,23 @@ public class IUConverter implements Converter<ByteBuffer>
@Override @Override
public WireContents<ByteBuffer> serialize(Class<?> typeInfo, Object obj) throws ConversionException public WireContents<ByteBuffer> serialize(Class<?> typeInfo, Object obj) throws ConversionException
{ {
AbstractIU iua = (AbstractIU)obj; AbstractIU iua = (AbstractIU) obj;
List<PayloadItem> payloadItems = new ArrayList<PayloadItem>(); List<PayloadItem> payloadItems = new ArrayList<PayloadItem>();
for(Entry<String, String> entry:iua.getPayload().entrySet()) for (Entry<String, String> entry : iua.getPayload().entrySet())
{ {
payloadItems.add(PayloadItem.newBuilder() payloadItems.add(PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("").build());
.setKey(entry.getKey())
.setValue(entry.getValue())
.setType("")
.build());
} }
List<LinkSet> links = new ArrayList<LinkSet>(); List<LinkSet> links = new ArrayList<LinkSet>();
for (Entry<String, Collection<String>> entry:iua.getAllLinks().asMap().entrySet()) for (Entry<String, Collection<String>> entry : iua.getAllLinks().asMap().entrySet())
{ {
links.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build()); links.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build());
} }
IU iu = IU.newBuilder() IU iu = IU.newBuilder().setUid(iua.getUid()).setRevision(iua.getRevision()).setCategory(iua.getCategory())
.setUid(iua.getUid()) .setOwnerName(iua.getOwnerName()).setCommitted(iua.isCommitted()).setAccessMode(IU.AccessMode.PUSH) // TODO for other access modes (also in Python version)
.setRevision(iua.getRevision()) .setReadOnly(iua.isReadOnly()).setPayloadType("MAP").addAllPayload(payloadItems).addAllLinks(links).build();
.setCategory(iua.getCategory()) return new WireContents<ByteBuffer>(ByteBuffer.wrap(iu.toByteArray()), "ipaaca-iu");
.setOwnerName(iua.getOwnerName())
.setCommitted(iua.isCommitted())
.setAccessMode(IU.AccessMode.PUSH) //TODO for other access modes (also in Python version)
.setReadOnly(iua.isReadOnly())
.setPayloadType("MAP")
.addAllPayload(payloadItems)
.addAllLinks(links)
.build();
return new WireContents<ByteBuffer>(ByteBuffer.wrap(iu.toByteArray()),"ipaaca-iu");
} }
@Override @Override
...@@ -87,22 +75,22 @@ public class IUConverter implements Converter<ByteBuffer> ...@@ -87,22 +75,22 @@ public class IUConverter implements Converter<ByteBuffer>
throw new RuntimeException(e); throw new RuntimeException(e);
} }
if(iu.getAccessMode() == IU.AccessMode.PUSH) if (iu.getAccessMode() == IU.AccessMode.PUSH)
{ {
RemotePushIU iuout = new RemotePushIU(iu.getUid()); RemotePushIU iuout = new RemotePushIU(iu.getUid());
iuout.setCategory(iu.getCategory()); iuout.setCategory(iu.getCategory());
iuout.committed = iu.getCommitted(); iuout.committed = iu.getCommitted();
iuout.setOwnerName(iu.getOwnerName()); iuout.setOwnerName(iu.getOwnerName());
iuout.setRevision(iu.getRevision()); iuout.setRevision(iu.getRevision());
iuout.setReadOnly(iu.getReadOnly()); iuout.setReadOnly(iu.getReadOnly());
iuout.payload = new Payload(iuout,iu.getPayloadList()); iuout.payload = new Payload(iuout, iu.getPayloadList());
SetMultimap<String, String> links = HashMultimap.create(); SetMultimap<String, String> links = HashMultimap.create();
for(LinkSet ls: iu.getLinksList()) for (LinkSet ls : iu.getLinksList())
{ {
links.putAll(ls.getType(),ls.getTargetsList()); links.putAll(ls.getType(), ls.getTargetsList());
} }
iuout.setLinksLocally(links); iuout.setLinksLocally(links);
return new UserData<RemotePushIU>(iuout, RemotePushIU.class); return new UserData<RemotePushIU>(iuout, RemotePushIU.class);
} }
else else
{ {
......
...@@ -7,76 +7,78 @@ import java.util.Set; ...@@ -7,76 +7,78 @@ import java.util.Set;
* Wrapper for IU event handling functions. * Wrapper for IU event handling functions.
* @author hvanwelbergen * @author hvanwelbergen
*/ */
public class IUEventHandler { public class IUEventHandler
private final EnumSet<IUEventType> eventTypes; {
private Set<String>categories; private final EnumSet<IUEventType> eventTypes;
private final HandlerFunctor handleFunctor; private Set<String> categories;
// def __init__(self, handler_function, for_event_types=None, for_categories=None): private final HandlerFunctor handleFunctor;
// """Create an IUEventHandler.
// // def __init__(self, handler_function, for_event_types=None, for_categories=None):
// Keyword arguments: // """Create an IUEventHandler.
// handler_function -- the handler function with the signature //
// (IU, event_type, local) // Keyword arguments:
// for_event_types -- a list of event types or None if handler should // handler_function -- the handler function with the signature
// be called for all event types // (IU, event_type, local)
// for_categories -- a list of category names or None if handler should // for_event_types -- a list of event types or None if handler should
// be called for all categoires // be called for all event types
// """ // for_categories -- a list of category names or None if handler should
// super(IUEventHandler, self).__init__() // be called for all categoires
// self._handler_function = handler_function // """
// self._for_event_types = ( // super(IUEventHandler, self).__init__()
// None if for_event_types is None else // self._handler_function = handler_function
// (for_event_types[:] if hasattr(for_event_types, '__iter__') else [for_event_types])) // self._for_event_types = (
// self._for_categories = ( // None if for_event_types is None else
// None if for_categories is None else // (for_event_types[:] if hasattr(for_event_types, '__iter__') else [for_event_types]))
// (for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories])) // self._for_categories = (
// None if for_categories is None else
public IUEventHandler(HandlerFunctor func, EnumSet<IUEventType> eventTypes, Set<String>categories) // (for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories]))
{
this.eventTypes = eventTypes; public IUEventHandler(HandlerFunctor func, EnumSet<IUEventType> eventTypes, Set<String> categories)
this.categories = categories; {
this.handleFunctor = func; this.eventTypes = eventTypes;
} this.categories = categories;
this.handleFunctor = func;
// def condition_met(self, event_type, category): }
// """Check whether this IUEventHandler should be called.
// // def condition_met(self, event_type, category):
// Keyword arguments: // """Check whether this IUEventHandler should be called.
// event_type -- type of the IU event //
// category -- category of the IU which triggered the event // Keyword arguments:
// """ // event_type -- type of the IU event
// type_condition_met = (self._for_event_types is None or event_type in self._for_event_types) // category -- category of the IU which triggered the event
// cat_condition_met = (self._for_categories is None or category in self._for_categories) // """
// return type_condition_met and cat_condition_met // type_condition_met = (self._for_event_types is None or event_type in self._for_event_types)
/** // cat_condition_met = (self._for_categories is None or category in self._for_categories)
* Check whether this IUEventHandler should be called. // return type_condition_met and cat_condition_met
* @param type type of the IU event /**
* @param category category of the IU which triggered the event * Check whether this IUEventHandler should be called.
*/ * @param type type of the IU event
private boolean conditionMet(IUEventType type, String category) * @param category category of the IU which triggered the event
{ */
return eventTypes.contains(type) && categories.contains(category); private boolean conditionMet(IUEventType type, String category)
} {
return eventTypes.contains(type) && categories.contains(category);
// def call(self, buffer, iu_uid, local, event_type, category): }
// """Call this IUEventHandler's function, if it applies.
// // def call(self, buffer, iu_uid, local, event_type, category):
// Keyword arguments: // """Call this IUEventHandler's function, if it applies.
// buffer -- the buffer in which the IU is stored //
// iu_uid -- the uid of the IU // Keyword arguments:
// local -- is the IU local or remote to this component? @RAMIN: Is this correct? // buffer -- the buffer in which the IU is stored
// event_type -- IU event type // iu_uid -- the uid of the IU
// category -- category of the IU // local -- is the IU local or remote to this component? @RAMIN: Is this correct?
// """ // event_type -- IU event type
// if self.condition_met(event_type, category): // category -- category of the IU
// iu = buffer._iu_store[iu_uid] // """
// self._handler_function(iu, event_type, local) // if self.condition_met(event_type, category):
public void call(Buffer buf, String iuUid, boolean local, IUEventType type, String category) // iu = buffer._iu_store[iu_uid]
{ // self._handler_function(iu, event_type, local)
if(conditionMet(type,category)) public void call(Buffer buf, String iuUid, boolean local, IUEventType type, String category)
{ {
AbstractIU iu = buf.getIU(iuUid); if (conditionMet(type, category))
handleFunctor.handle(iu, type, local); {
} AbstractIU iu = buf.getIU(iuUid);
} handleFunctor.handle(iu, type, local);
}
}
} }
package ipaaca; package ipaaca;
public enum IUEventType { public enum IUEventType
ADDED, COMMITTED, DELETED, RETRACTED, UPDATED,LINKSUPDATED; {
ADDED, COMMITTED, DELETED, RETRACTED, UPDATED, LINKSUPDATED;
} }
...@@ -3,7 +3,7 @@ package ipaaca; ...@@ -3,7 +3,7 @@ package ipaaca;
/** /**
* IUPublishedException exceptions occur when publishing (=putting it in an output buffer) an already published IU. * IUPublishedException exceptions occur when publishing (=putting it in an output buffer) an already published IU.
* @author hvanwelbergen * @author hvanwelbergen
* *
*/ */
public class IUPublishedException extends RuntimeException public class IUPublishedException extends RuntimeException
{ {
......
...@@ -3,7 +3,7 @@ package ipaaca; ...@@ -3,7 +3,7 @@ package ipaaca;
/** /**
* IUReadOnlyException's occur when writing to a read-only IU * IUReadOnlyException's occur when writing to a read-only IU
* @author hvanwelbergen * @author hvanwelbergen
* *
*/ */
public class IUReadOnlyException extends RuntimeException public class IUReadOnlyException extends RuntimeException
{ {
......
...@@ -3,7 +3,7 @@ package ipaaca; ...@@ -3,7 +3,7 @@ package ipaaca;
/** /**
* Indicates that a remote update failed * Indicates that a remote update failed
* @author hvanwelbergen * @author hvanwelbergen
* *
*/ */
public class IUUpdateFailedException extends RuntimeException public class IUUpdateFailedException extends RuntimeException
{ {
......
...@@ -8,11 +8,14 @@ import rsb.converter.ProtocolBufferConverter; ...@@ -8,11 +8,14 @@ import rsb.converter.ProtocolBufferConverter;
/** /**
* Hooks up the ipaaca converters, call initializeIpaacaRsb() before using ipaaca. * Hooks up the ipaaca converters, call initializeIpaacaRsb() before using ipaaca.
* @author hvanwelbergen * @author hvanwelbergen
* *
*/ */
public final class Initializer public final class Initializer
{ {
private Initializer(){} private Initializer()
{
}
public static void initializeIpaacaRsb() public static void initializeIpaacaRsb()
{ {
DefaultConverterRepository.getDefaultConverterRepository().addConverter(new IntConverter()); DefaultConverterRepository.getDefaultConverterRepository().addConverter(new IntConverter());
......
...@@ -25,68 +25,69 @@ import rsb.patterns.RemoteServer; ...@@ -25,68 +25,69 @@ import rsb.patterns.RemoteServer;
* An InputBuffer that holds remote IUs. * An InputBuffer that holds remote IUs.
* @author hvanwelbergen * @author hvanwelbergen
*/ */
public class InputBuffer extends Buffer{ public class InputBuffer extends Buffer
private Map<String,RemoteServer> remoteServerStore = new HashMap<String,RemoteServer>(); {
private Map<String,Listener> listenerStore = new HashMap<String,Listener>(); private Map<String, RemoteServer> remoteServerStore = new HashMap<String, RemoteServer>();
private Set<String> categoryInterests = new HashSet<String>(); private Map<String, Listener> listenerStore = new HashMap<String, Listener>();
private final static Logger logger = LoggerFactory.getLogger(InputBuffer.class.getName()); private Set<String> categoryInterests = new HashSet<String>();
private IUStore<RemotePushIU> iuStore = new IUStore<RemotePushIU>(); private final static Logger logger = LoggerFactory.getLogger(InputBuffer.class.getName());
private IUStore<RemotePushIU> iuStore = new IUStore<RemotePushIU>();
public void close()
{ public void close()
for(Listener listener: listenerStore.values()) {
{ for (Listener listener : listenerStore.values())
listener.deactivate(); {
} listener.deactivate();
for(RemoteServer remServer: remoteServerStore.values()) }
{ for (RemoteServer remServer : remoteServerStore.values())
remServer.deactivate(); {
} remServer.deactivate();
} }
}
// def __init__(self, owning_component_name, category_interests=None, participant_config=None):
// '''Create an InputBuffer. // def __init__(self, owning_component_name, category_interests=None, participant_config=None):
// // '''Create an InputBuffer.
// Keyword arguments: //
// owning_compontent_name -- name of the entity that owns this InputBuffer // Keyword arguments:
// category_interests -- list of IU categories this Buffer is interested in // owning_compontent_name -- name of the entity that owns this InputBuffer
// participant_config = RSB configuration // category_interests -- list of IU categories this Buffer is interested in
// ''' // participant_config = RSB configuration
// super(InputBuffer, self).__init__(owning_component_name, participant_config) // '''
// self._unique_name = '/ipaaca/component/'+str(owning_component_name)+'ID'+self._uuid+'/IB' // super(InputBuffer, self).__init__(owning_component_name, participant_config)
// self._listener_store = {} # one per IU category // self._unique_name = '/ipaaca/component/'+str(owning_component_name)+'ID'+self._uuid+'/IB'
// self._remote_server_store = {} # one per remote-IU-owning Component // self._listener_store = {} # one per IU category
// self._category_interests = [] // self._remote_server_store = {} # one per remote-IU-owning Component
// if category_interests is not None: // self._category_interests = []
// for cat in category_interests: // if category_interests is not None:
// self._create_category_listener_if_needed(cat) // for cat in category_interests:
public InputBuffer(String owningComponentName, Set<String>categoryInterests) // self._create_category_listener_if_needed(cat)
{ public InputBuffer(String owningComponentName, Set<String> categoryInterests)
super(owningComponentName); {
uniqueName = "/ipaaca/component/"+ owningComponentName +"ID"+uuid+"/IB"; super(owningComponentName);
uniqueName = "/ipaaca/component/" + owningComponentName + "ID" + uuid + "/IB";
for (String cat:categoryInterests)
{ for (String cat : categoryInterests)
createCategoryListenerIfNeeded(cat); {
} createCategoryListenerIfNeeded(cat);
} }
}
// def _get_remote_server(self, iu):
// '''Return (or create, store and return) a remote server.''' // def _get_remote_server(self, iu):
// if iu.owner_name in self._remote_server_store: // '''Return (or create, store and return) a remote server.'''
// return self._remote_server_store[iu.owner_name] // if iu.owner_name in self._remote_server_store:
// remote_server = rsb.createRemoteServer(rsb.Scope(str(iu.owner_name))) // return self._remote_server_store[iu.owner_name]
// self._remote_server_store[iu.owner_name] = remote_server // remote_server = rsb.createRemoteServer(rsb.Scope(str(iu.owner_name)))
// return remote_server // self._remote_server_store[iu.owner_name] = remote_server
public RemoteServer getRemoteServer(AbstractIU iu) // return remote_server
{ public RemoteServer getRemoteServer(AbstractIU iu)
if(remoteServerStore.containsKey(iu.getOwnerName())) {
{ if (remoteServerStore.containsKey(iu.getOwnerName()))
return remoteServerStore.get(iu.getOwnerName()); {
} return remoteServerStore.get(iu.getOwnerName());
logger.debug("Getting remote server for {}",iu.getOwnerName()); }
RemoteServer remoteServer = Factory.getInstance().createRemoteServer(new Scope(iu.getOwnerName())); logger.debug("Getting remote server for {}", iu.getOwnerName());
try RemoteServer remoteServer = Factory.getInstance().createRemoteServer(new Scope(iu.getOwnerName()));
try
{ {
remoteServer.activate(); remoteServer.activate();
} }
...@@ -94,183 +95,186 @@ public class InputBuffer extends Buffer{ ...@@ -94,183 +95,186 @@ public class InputBuffer extends Buffer{
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
remoteServerStore.put(iu.getOwnerName(), remoteServer); remoteServerStore.put(iu.getOwnerName(), remoteServer);
return remoteServer; return remoteServer;
} }
// def _create_category_listener_if_needed(self, iu_category): // def _create_category_listener_if_needed(self, iu_category):
// '''Return (or create, store and return) a category listener.''' // '''Return (or create, store and return) a category listener.'''
// if iu_category in self._listener_store: return self._informer_store[iu_category] // if iu_category in self._listener_store: return self._informer_store[iu_category]
// cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config) // cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
// cat_listener.addHandler(self._handle_iu_events) // cat_listener.addHandler(self._handle_iu_events)
// self._listener_store[iu_category] = cat_listener // self._listener_store[iu_category] = cat_listener
// self._category_interests.append(iu_category) // self._category_interests.append(iu_category)
// logger.info("Added category listener for "+iu_category) // logger.info("Added category listener for "+iu_category)
// return cat_listener // return cat_listener
private Listener createCategoryListenerIfNeeded(String category) private Listener createCategoryListenerIfNeeded(String category)
{ {
if(listenerStore.containsKey(category)) if (listenerStore.containsKey(category))
{ {
return listenerStore.get(category); return listenerStore.get(category);
} }
Listener listener = Factory.getInstance().createListener(new Scope("/ipaaca/category/"+category)); Listener listener = Factory.getInstance().createListener(new Scope("/ipaaca/category/" + category));
listenerStore.put(category,listener); listenerStore.put(category, listener);
categoryInterests.add(category); categoryInterests.add(category);
listener.addHandler(new InputHandler(), true); listener.addHandler(new InputHandler(), true);
logger.info("Added category listener for {}",category); logger.info("Added category listener for {}", category);
try try
{ {
listener.activate(); listener.activate();
} }
catch (InitializeException e) catch (InitializeException e)
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return listener; return listener;
} }
class InputHandler implements Handler
{
@Override class InputHandler implements Handler
public void internalNotify(Event ev) { {
handleIUEvents(ev);
} @Override
public void internalNotify(Event ev)
} {
// def _handle_iu_events(self, event): handleIUEvents(ev);
// '''Dispatch incoming IU events. }
//
// Adds incoming IU's to the store, applies payload and commit updates to }
// IU, calls IU event handlers.'
// // def _handle_iu_events(self, event):
// Keyword arguments: // '''Dispatch incoming IU events.
// event -- a converted RSB event //
// ''' // Adds incoming IU's to the store, applies payload and commit updates to
// if type(event.data) is RemotePushIU: // IU, calls IU event handlers.'
// # a new IU //
// if event.data.uid in self._iu_store: // Keyword arguments:
// # already in our store // event -- a converted RSB event
// pass // '''
// else: // if type(event.data) is RemotePushIU:
// self._iu_store[ event.data.uid ] = event.data // # a new IU
// event.data.buffer = self // if event.data.uid in self._iu_store:
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.ADDED, category=event.data.category) // # already in our store
// else: // pass
// # an update to an existing IU // else:
// if event.data.writer_name == self.unique_name: // self._iu_store[ event.data.uid ] = event.data
// # Discard updates that originate from this buffer // event.data.buffer = self
// return // self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.ADDED, category=event.data.category)
// if event.data.uid not in self._iu_store: // else:
// # TODO: we should request the IU's owner to send us the IU // # an update to an existing IU
// logger.warning("Update message for IU which we did not fully receive before.") // if event.data.writer_name == self.unique_name:
// return // # Discard updates that originate from this buffer
// if type(event.data) is iuProtoBuf_pb2.IUCommission: // return
// # IU commit // if event.data.uid not in self._iu_store:
// iu = self._iu_store[event.data.uid] // # TODO: we should request the IU's owner to send us the IU
// iu._apply_commission() // logger.warning("Update message for IU which we did not fully receive before.")
// iu._revision = event.data.revision // return
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category) // if type(event.data) is iuProtoBuf_pb2.IUCommission:
// elif type(event.data) is IUPayloadUpdate: // # IU commit
// # IU payload update // iu = self._iu_store[event.data.uid]
// iu = self._iu_store[event.data.uid] // iu._apply_commission()
// iu._apply_update(event.data) // iu._revision = event.data.revision
// self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category) // self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category)
/** // elif type(event.data) is IUPayloadUpdate:
* Dispatch incoming IU events. // # IU payload update
*/ // iu = self._iu_store[event.data.uid]
private void handleIUEvents(Event event) // iu._apply_update(event.data)
{ // self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category)
if(event.getData() instanceof RemotePushIU) /**
{ * Dispatch incoming IU events.
RemotePushIU rp = (RemotePushIU)event.getData(); */
//a new IU private void handleIUEvents(Event event)
if(iuStore.containsKey(rp.getUid())) {
{ if (event.getData() instanceof RemotePushIU)
// already in our store {
return; RemotePushIU rp = (RemotePushIU) event.getData();
} // a new IU
else if (iuStore.containsKey(rp.getUid()))
{ {
iuStore.put(rp.getUid(), rp); // already in our store
rp.setBuffer(this); return;
this.callIuEventHandlers(rp.getUid(), false, IUEventType.ADDED, rp.getCategory()); }
} else
} {
else iuStore.put(rp.getUid(), rp);
{ rp.setBuffer(this);
if (event.getData() instanceof IULinkUpdate) this.callIuEventHandlers(rp.getUid(), false, IUEventType.ADDED, rp.getCategory());
}
}
else
{
if (event.getData() instanceof IULinkUpdate)
{ {
IULinkUpdate iuLinkUpdate = (IULinkUpdate)event.getData(); IULinkUpdate iuLinkUpdate = (IULinkUpdate) event.getData();
if(iuLinkUpdate.getWriterName().equals(this.getUniqueName())) if (iuLinkUpdate.getWriterName().equals(this.getUniqueName()))
{ {
//Discard updates that originate from this buffer // Discard updates that originate from this buffer
return; return;
} }
if(!iuStore.containsKey(iuLinkUpdate.getUid())) if (!iuStore.containsKey(iuLinkUpdate.getUid()))
{ {
logger.warn("Link update message for IU which we did not fully receive before."); logger.warn("Link update message for IU which we did not fully receive before.");
return; return;
} }
RemotePushIU iu = this.iuStore.get(iuLinkUpdate.getUid()); RemotePushIU iu = this.iuStore.get(iuLinkUpdate.getUid());
iu.applyLinkUpdate(iuLinkUpdate); iu.applyLinkUpdate(iuLinkUpdate);
callIuEventHandlers(iu.getUid(), false, IUEventType.LINKSUPDATED, iu.category); callIuEventHandlers(iu.getUid(), false, IUEventType.LINKSUPDATED, iu.category);
} }
if (event.getData() instanceof IUPayloadUpdate) if (event.getData() instanceof IUPayloadUpdate)
{ {
IUPayloadUpdate iuUpdate = (IUPayloadUpdate)event.getData(); IUPayloadUpdate iuUpdate = (IUPayloadUpdate) event.getData();
logger.debug("handleIUEvents invoked with an IUPayloadUpdate: {}", iuUpdate); logger.debug("handleIUEvents invoked with an IUPayloadUpdate: {}", iuUpdate);
if(iuUpdate.getWriterName().equals(this.getUniqueName())) if (iuUpdate.getWriterName().equals(this.getUniqueName()))
{ {
//Discard updates that originate from this buffer // Discard updates that originate from this buffer
return; return;
} }
if(!iuStore.containsKey(iuUpdate.getUid())) if (!iuStore.containsKey(iuUpdate.getUid()))
{ {
logger.warn("Update message for IU which we did not fully receive before."); logger.warn("Update message for IU which we did not fully receive before.");
return; return;
} }
RemotePushIU iu = this.iuStore.get(iuUpdate.getUid()); RemotePushIU iu = this.iuStore.get(iuUpdate.getUid());
iu.applyUpdate(iuUpdate); iu.applyUpdate(iuUpdate);
callIuEventHandlers(iu.getUid(), false, IUEventType.UPDATED, iu.category); callIuEventHandlers(iu.getUid(), false, IUEventType.UPDATED, iu.category);
} }
if (event.getData() instanceof IUCommission) if (event.getData() instanceof IUCommission)
{ {
IUCommission iuc = (IUCommission)event.getData(); IUCommission iuc = (IUCommission) event.getData();
logger.debug("handleIUEvents invoked with an IUCommission: {}", iuc); logger.debug("handleIUEvents invoked with an IUCommission: {}", iuc);
logger.debug("{}, {}",iuc.getWriterName(), this.getUniqueName()); logger.debug("{}, {}", iuc.getWriterName(), this.getUniqueName());
if(iuc.getWriterName().equals(this.getUniqueName())) if (iuc.getWriterName().equals(this.getUniqueName()))
{ {
//Discard updates that originate from this buffer // Discard updates that originate from this buffer
return; return;
} }
if(!iuStore.containsKey(iuc.getUid())) if (!iuStore.containsKey(iuc.getUid()))
{ {
logger.warn("Update message for IU which we did not fully receive before."); logger.warn("Update message for IU which we did not fully receive before.");
return; return;
} }
RemotePushIU iu = this.iuStore.get(iuc.getUid()); RemotePushIU iu = this.iuStore.get(iuc.getUid());
iu.applyCommmision(); iu.applyCommmision();
iu.setRevision(iuc.getRevision()); iu.setRevision(iuc.getRevision());
callIuEventHandlers(iuc.getUid(), false, IUEventType.COMMITTED, iu.getCategory()); callIuEventHandlers(iuc.getUid(), false, IUEventType.COMMITTED, iu.getCategory());
} }
} }
} }
public InputBuffer(String owningComponentName) { public InputBuffer(String owningComponentName)
super(owningComponentName); {
} super(owningComponentName);
}
@Override @Override
public AbstractIU getIU(String iuid) public AbstractIU getIU(String iuid)
{ {
return iuStore.get(iuid); return iuStore.get(iuid);
} }
public Collection<RemotePushIU> getIUs() public Collection<RemotePushIU> getIUs()
{ {
return iuStore.values(); return iuStore.values();
} }
} }
...@@ -14,27 +14,24 @@ import com.google.protobuf.InvalidProtocolBufferException; ...@@ -14,27 +14,24 @@ import com.google.protobuf.InvalidProtocolBufferException;
/** /**
* Serializer/deserializer for ints * Serializer/deserializer for ints
* @author hvanwelbergen * @author hvanwelbergen
* *
*/ */
public class IntConverter implements Converter<ByteBuffer> public class IntConverter implements Converter<ByteBuffer>
{ {
@Override @Override
public ConverterSignature getSignature() public ConverterSignature getSignature()
{ {
return new ConverterSignature("int32",Integer.class); return new ConverterSignature("int32", Integer.class);
} }
@Override @Override
public WireContents<ByteBuffer> serialize(Class<?> typeInfo, Object obj) throws ConversionException public WireContents<ByteBuffer> serialize(Class<?> typeInfo, Object obj) throws ConversionException
{ {
Integer intVal = (Integer)obj; Integer intVal = (Integer) obj;
IntMessage message = IntMessage.newBuilder() IntMessage message = IntMessage.newBuilder().setValue(intVal).build();
.setValue(intVal)
.build(); return new WireContents<ByteBuffer>(ByteBuffer.wrap(message.toByteArray()), "int32");
return new WireContents<ByteBuffer>(ByteBuffer.wrap(message.toByteArray()),"int32");
} }
@Override @Override
......
...@@ -15,7 +15,7 @@ import com.google.protobuf.InvalidProtocolBufferException; ...@@ -15,7 +15,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
/** /**
* Serializer/deserializer for IULinkUpdate * Serializer/deserializer for IULinkUpdate
* @author hvanwelbergen * @author hvanwelbergen
* *
*/ */
public class LinkUpdateConverter implements Converter<ByteBuffer> public class LinkUpdateConverter implements Converter<ByteBuffer>
{ {
...@@ -33,20 +33,20 @@ public class LinkUpdateConverter implements Converter<ByteBuffer> ...@@ -33,20 +33,20 @@ public class LinkUpdateConverter implements Converter<ByteBuffer>
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return new UserData<IULinkUpdate>(pl, IULinkUpdate.class); return new UserData<IULinkUpdate>(pl, IULinkUpdate.class);
} }
@Override @Override
public ConverterSignature getSignature() public ConverterSignature getSignature()
{ {
return new ConverterSignature(LINKUPDATE_WIRESCHEMA,IULinkUpdate.class); return new ConverterSignature(LINKUPDATE_WIRESCHEMA, IULinkUpdate.class);
} }
@Override @Override
public WireContents<ByteBuffer> serialize(Class<?> typeInfo, Object obj) throws ConversionException public WireContents<ByteBuffer> serialize(Class<?> typeInfo, Object obj) throws ConversionException
{ {
IULinkUpdate pl = (IULinkUpdate)obj; IULinkUpdate pl = (IULinkUpdate) obj;
return new WireContents<ByteBuffer>(ByteBuffer.wrap(pl.toByteArray()),LINKUPDATE_WIRESCHEMA); return new WireContents<ByteBuffer>(ByteBuffer.wrap(pl.toByteArray()), LINKUPDATE_WIRESCHEMA);
} }
} }
...@@ -70,9 +70,9 @@ public class LocalIU extends AbstractIU ...@@ -70,9 +70,9 @@ public class LocalIU extends AbstractIU
{ {
increaseRevisionNumber(); increaseRevisionNumber();
committed = true; committed = true;
if(outputBuffer!=null) if (outputBuffer != null)
{ {
outputBuffer.sendIUCommission(this, writerName); outputBuffer.sendIUCommission(this, writerName);
} }
} }
} }
...@@ -112,7 +112,7 @@ public class LocalIU extends AbstractIU ...@@ -112,7 +112,7 @@ public class LocalIU extends AbstractIU
synchronized (revisionLock) synchronized (revisionLock)
{ {
increaseRevisionNumber(); increaseRevisionNumber();
if(isPublished()) if (isPublished())
{ {
String wName = null; String wName = null;
if (getBuffer() != null) if (getBuffer() != null)
...@@ -127,24 +127,19 @@ public class LocalIU extends AbstractIU ...@@ -127,24 +127,19 @@ public class LocalIU extends AbstractIU
{ {
wName = null; wName = null;
} }
Set<LinkSet> addSet = new HashSet<LinkSet>(); Set<LinkSet> addSet = new HashSet<LinkSet>();
for(Entry<String, Collection<String>> entry :linksToAdd.asMap().entrySet()) for (Entry<String, Collection<String>> entry : linksToAdd.asMap().entrySet())
{ {
addSet.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build()); addSet.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build());
} }
Set<LinkSet> removeSet = new HashSet<LinkSet>(); Set<LinkSet> removeSet = new HashSet<LinkSet>();
for(Entry<String, Collection<String>> entry :linksToRemove.asMap().entrySet()) for (Entry<String, Collection<String>> entry : linksToRemove.asMap().entrySet())
{ {
removeSet.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build()); removeSet.add(LinkSet.newBuilder().setType(entry.getKey()).addAllTargets(entry.getValue()).build());
} }
outputBuffer.sendIULinkUpdate(this,IULinkUpdate.newBuilder() outputBuffer.sendIULinkUpdate(this,
.setUid(getUid()) IULinkUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setWriterName(wName).setIsDelta(isDelta)
.setRevision(getRevision()) .addAllNewLinks(addSet).addAllLinksToRemove(removeSet).build());
.setWriterName(wName)
.setIsDelta(isDelta)
.addAllNewLinks(addSet)
.addAllLinksToRemove(removeSet)
.build());
} }
} }
} }
......
...@@ -22,7 +22,7 @@ public class Payload implements Map<String, String> ...@@ -22,7 +22,7 @@ public class Payload implements Map<String, String>
{ {
this.iu = iu; this.iu = iu;
} }
// def __init__(self, remote_push_iu, new_payload): // def __init__(self, remote_push_iu, new_payload):
// """Create remote payload object. // """Create remote payload object.
// //
...@@ -34,46 +34,46 @@ public class Payload implements Map<String, String> ...@@ -34,46 +34,46 @@ public class Payload implements Map<String, String>
// self._remote_push_iu = remote_push_iu // self._remote_push_iu = remote_push_iu
// if new_payload is not None: // if new_payload is not None:
// for k,v in new_payload.items(): // for k,v in new_payload.items():
// dict.__setitem__(self, k, v) // dict.__setitem__(self, k, v)
public Payload(AbstractIU iu, List<PayloadItem> payloadItems) public Payload(AbstractIU iu, List<PayloadItem> payloadItems)
{ {
this(iu,payloadItems,null); this(iu, payloadItems, null);
} }
public Payload(AbstractIU iu, Map<String,String> newPayload) public Payload(AbstractIU iu, Map<String, String> newPayload)
{ {
this(iu,newPayload,null); this(iu, newPayload, null);
} }
public Payload(AbstractIU iu, Map<String,String> newPayload, String writerName) public Payload(AbstractIU iu, Map<String, String> newPayload, String writerName)
{ {
this.iu = iu; this.iu = iu;
set(newPayload, writerName); set(newPayload, writerName);
} }
public Payload(AbstractIU iu, List<PayloadItem>newPayload, String writerName) public Payload(AbstractIU iu, List<PayloadItem> newPayload, String writerName)
{ {
this.iu = iu; this.iu = iu;
set(newPayload,writerName); set(newPayload, writerName);
} }
public void set(Map<String,String> newPayload, String writerName) public void set(Map<String, String> newPayload, String writerName)
{ {
iu.setPayload(newPayload, writerName); iu.setPayload(newPayload, writerName);
map.clear(); map.clear();
map.putAll(newPayload); map.putAll(newPayload);
} }
public void set(List<PayloadItem>newPayload, String writerName) public void set(List<PayloadItem> newPayload, String writerName)
{ {
iu.handlePayloadSetting(newPayload,writerName); iu.handlePayloadSetting(newPayload, writerName);
map.clear(); map.clear();
for (PayloadItem item : newPayload) for (PayloadItem item : newPayload)
{ {
map.put(item.getKey(), item.getValue()); map.put(item.getKey(), item.getValue());
} }
} }
// def _remotely_enforced_setitem(self, k, v): // def _remotely_enforced_setitem(self, k, v):
// """Sets an item when requested remotely.""" // """Sets an item when requested remotely."""
// return dict.__setitem__(self, k, v) // return dict.__setitem__(self, k, v)
...@@ -160,7 +160,7 @@ public class Payload implements Map<String, String> ...@@ -160,7 +160,7 @@ public class Payload implements Map<String, String>
// raise IUUpdateFailedError(self._remote_push_iu) // raise IUUpdateFailedError(self._remote_push_iu)
// else: // else:
// self._remote_push_iu._revision = new_revision // self._remote_push_iu._revision = new_revision
// dict.__setitem__(self, k, v) // dict.__setitem__(self, k, v)
/** /**
* Set item in this payload. * Set item in this payload.
* Requests item setting from the OutputBuffer holding the local version * Requests item setting from the OutputBuffer holding the local version
...@@ -204,7 +204,7 @@ public class Payload implements Map<String, String> ...@@ -204,7 +204,7 @@ public class Payload implements Map<String, String>
* Requests item deletion from the OutputBuffer holding the local version * Requests item deletion from the OutputBuffer holding the local version
* of this IU. Returns when permission is granted and item is deleted; * of this IU. Returns when permission is granted and item is deleted;
* otherwise raises an IUUpdateFailedError. * otherwise raises an IUUpdateFailedError.
*/ */
public String remove(Object key, String writer) public String remove(Object key, String writer)
{ {
iu.removeFromPayload(key, writer); iu.removeFromPayload(key, writer);
......
...@@ -15,7 +15,7 @@ import com.google.protobuf.InvalidProtocolBufferException; ...@@ -15,7 +15,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
/** /**
* Serializer/deserializer for IUPayloadUpdate * Serializer/deserializer for IUPayloadUpdate
* @author hvanwelbergen * @author hvanwelbergen
* *
*/ */
public class PayloadConverter implements Converter<ByteBuffer> public class PayloadConverter implements Converter<ByteBuffer>
{ {
...@@ -33,20 +33,20 @@ public class PayloadConverter implements Converter<ByteBuffer> ...@@ -33,20 +33,20 @@ public class PayloadConverter implements Converter<ByteBuffer>
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return new UserData<IUPayloadUpdate>(pl, IUPayloadUpdate.class); return new UserData<IUPayloadUpdate>(pl, IUPayloadUpdate.class);
} }
@Override @Override
public ConverterSignature getSignature() public ConverterSignature getSignature()
{ {
return new ConverterSignature(PAYLOAD_WIRESCHEMA,IUPayloadUpdate.class); return new ConverterSignature(PAYLOAD_WIRESCHEMA, IUPayloadUpdate.class);
} }
@Override @Override
public WireContents<ByteBuffer> serialize(Class<?> typeInfo, Object obj) throws ConversionException public WireContents<ByteBuffer> serialize(Class<?> typeInfo, Object obj) throws ConversionException
{ {
IUPayloadUpdate pl = (IUPayloadUpdate)obj; IUPayloadUpdate pl = (IUPayloadUpdate) obj;
return new WireContents<ByteBuffer>(ByteBuffer.wrap(pl.toByteArray()),PAYLOAD_WIRESCHEMA); return new WireContents<ByteBuffer>(ByteBuffer.wrap(pl.toByteArray()), PAYLOAD_WIRESCHEMA);
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment