From 7755ce2b05747220cbacc56bae11e0a96bbb98fe Mon Sep 17 00:00:00 2001 From: Sebastian Kahl <seb@glialfire.net> Date: Wed, 3 Dec 2014 15:12:22 +0100 Subject: [PATCH] Added ipaaca channel possibilities based on rsb scope. Also, deprecated incompatible InputBuffer constrcutors for non-container parameters for multiple categories, from cpp version --- ipaacalib/cpp/include/ipaaca/ipaaca.h | 36 +-- ipaacalib/cpp/src/ipaaca.cc | 147 ++++++----- ipaacalib/java/src/ipaaca/InputBuffer.java | 19 +- ipaacalib/java/src/ipaaca/OutputBuffer.java | 24 +- ipaacalib/python/src/ipaaca/__init__.py | 268 ++++++++++---------- 5 files changed, 266 insertions(+), 228 deletions(-) diff --git a/ipaacalib/cpp/include/ipaaca/ipaaca.h b/ipaacalib/cpp/include/ipaaca/ipaaca.h index 6bcdd8d..b9bd56b 100644 --- a/ipaacalib/cpp/include/ipaaca/ipaaca.h +++ b/ipaacalib/cpp/include/ipaaca/ipaaca.h @@ -1,10 +1,10 @@ /* * This file is part of IPAACA, the * "Incremental Processing Architecture - * for Artificial Conversational Agents". + * for Artificial Conversational Agents". * * Copyright (c) 2009-2013 Sociable Agents Group - * CITEC, Bielefeld University + * CITEC, Bielefeld University * * http://opensource.cit-ec.de/projects/ipaaca/ * http://purl.org/net/ipaaca @@ -21,7 +21,7 @@ * You should have received a copy of the LGPL along with this * program. If not, go to http://www.gnu.org/licenses/lgpl.html * or write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * * The development of this software was supported by the * Excellence Cluster EXC 277 Cognitive Interaction Technology. @@ -330,7 +330,7 @@ IPAACA_HEADER_EXPORT class SmartLinkMap { public: IPAACA_HEADER_EXPORT const LinkSet& get_links(const std::string& key); IPAACA_HEADER_EXPORT const LinkMap& get_all_links(); - + protected: IPAACA_MEMBER_VAR_EXPORT LinkMap _links; IPAACA_MEMBER_VAR_EXPORT static LinkSet empty_link_set; @@ -409,6 +409,7 @@ IPAACA_HEADER_EXPORT class OutputBuffer: public Buffer { //, public boost::enabl protected: IPAACA_MEMBER_VAR_EXPORT std::map<std::string, rsb::Informer<rsb::AnyType>::Ptr> _informer_store; IPAACA_MEMBER_VAR_EXPORT rsb::patterns::ServerPtr _server; + IPAACA_MEMBER_VAR_EXPORT std::string _channel; IPAACA_HEADER_EXPORT rsb::Informer<rsb::AnyType>::Ptr _get_informer(const std::string& category); #endif protected: @@ -423,7 +424,7 @@ IPAACA_HEADER_EXPORT class OutputBuffer: public Buffer { //, public boost::enabl IPAACA_HEADER_EXPORT void _publish_iu(boost::shared_ptr<IU> iu); IPAACA_HEADER_EXPORT void _retract_iu(boost::shared_ptr<IU> iu); protected: - IPAACA_HEADER_EXPORT OutputBuffer(const std::string& basename); + IPAACA_HEADER_EXPORT OutputBuffer(const std::string& basename, const std::string& channel="default"); IPAACA_HEADER_EXPORT void _initialize_server(); public: IPAACA_HEADER_EXPORT static boost::shared_ptr<OutputBuffer> create(const std::string& basename); @@ -449,6 +450,7 @@ IPAACA_HEADER_EXPORT class InputBuffer: public Buffer { //, public boost::enable IPAACA_MEMBER_VAR_EXPORT std::map<std::string, rsb::ListenerPtr> _listener_store; IPAACA_MEMBER_VAR_EXPORT std::map<std::string, rsb::patterns::RemoteServerPtr> _remote_server_store; IPAACA_MEMBER_VAR_EXPORT RemotePushIUStore _iu_store; // TODO genericize + IPAACA_MEMBER_VAR_EXPORT std::string _channel; IPAACA_HEADER_EXPORT rsb::patterns::RemoteServerPtr _get_remote_server(const std::string& unique_server_name); IPAACA_HEADER_EXPORT rsb::ListenerPtr _create_category_listener_if_needed(const std::string& category); IPAACA_HEADER_EXPORT void _handle_iu_events(rsb::EventPtr event); @@ -467,19 +469,19 @@ IPAACA_HEADER_EXPORT class InputBuffer: public Buffer { //, public boost::enable IPAACA_WARNING("(ERROR) InputBuffer::_send_iu_commission() should never be invoked") } protected: - IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::set<std::string>& category_interests); - IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests); - IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::string& category_interest1); - IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2); - IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3); - IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4); + IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::set<std::string>& category_interests, const std::string& channel="default"); + IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests, const std::string& channel="default"); + IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& channel="default"); + // IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2); + // IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3); + // IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4); public: - IPAACA_HEADER_EXPORT static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::set<std::string>& category_interests); - IPAACA_HEADER_EXPORT static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::vector<std::string>& category_interests); - IPAACA_HEADER_EXPORT static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1); - IPAACA_HEADER_EXPORT static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2); - IPAACA_HEADER_EXPORT static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3); - IPAACA_HEADER_EXPORT static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4); + IPAACA_HEADER_EXPORT static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::set<std::string>& category_interests, const std::string& channel="default"); + IPAACA_HEADER_EXPORT static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::vector<std::string>& category_interests, const std::string& channel="default"); + IPAACA_HEADER_EXPORT static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1, const std::string& channel="default"); + // IPAACA_HEADER_EXPORT static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2); + // IPAACA_HEADER_EXPORT static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3); + // IPAACA_HEADER_EXPORT static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4); IPAACA_HEADER_EXPORT ~InputBuffer() { IPAACA_IMPLEMENT_ME } diff --git a/ipaacalib/cpp/src/ipaaca.cc b/ipaacalib/cpp/src/ipaaca.cc index 879e76c..c25e925 100644 --- a/ipaacalib/cpp/src/ipaaca.cc +++ b/ipaacalib/cpp/src/ipaaca.cc @@ -1,10 +1,10 @@ /* * This file is part of IPAACA, the * "Incremental Processing Architecture - * for Artificial Conversational Agents". + * for Artificial Conversational Agents". * * Copyright (c) 2009-2013 Sociable Agents Group - * CITEC, Bielefeld University + * CITEC, Bielefeld University * * http://opensource.cit-ec.de/projects/ipaaca/ * http://purl.org/net/ipaaca @@ -21,7 +21,7 @@ * You should have received a copy of the LGPL along with this * program. If not, go to http://www.gnu.org/licenses/lgpl.html * or write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * * The development of this software was supported by the * Excellence Cluster EXC 277 Cognitive Interaction Technology. @@ -64,36 +64,36 @@ IPAACA_EXPORT bool Initializer::initialized() { return _initialized; } IPAACA_EXPORT void Initializer::initialize_ipaaca_rsb_if_needed() { if (_initialized) return; - + //IPAACA_INFO("Calling initialize_updated_default_config()") initialize_updated_default_config(); // RYT FIXME This configuration stuff has been simply removed in rsb! //ParticipantConfig config = ParticipantConfig::fromConfiguration(); //getFactory().setDefaultParticipantConfig(config); - + //IPAACA_INFO("Creating and registering Converters") boost::shared_ptr<IUConverter> iu_converter(new IUConverter()); converterRepository<std::string>()->registerConverter(iu_converter); - + boost::shared_ptr<MessageConverter> message_converter(new MessageConverter()); converterRepository<std::string>()->registerConverter(message_converter); - + boost::shared_ptr<IUPayloadUpdateConverter> payload_update_converter(new IUPayloadUpdateConverter()); converterRepository<std::string>()->registerConverter(payload_update_converter); - + boost::shared_ptr<IULinkUpdateConverter> link_update_converter(new IULinkUpdateConverter()); converterRepository<std::string>()->registerConverter(link_update_converter); - + boost::shared_ptr<ProtocolBufferConverter<protobuf::IUCommission> > iu_commission_converter(new ProtocolBufferConverter<protobuf::IUCommission> ()); converterRepository<std::string>()->registerConverter(iu_commission_converter); - + boost::shared_ptr<ProtocolBufferConverter<protobuf::IURetraction> > iu_retraction_converter(new ProtocolBufferConverter<protobuf::IURetraction> ()); converterRepository<std::string>()->registerConverter(iu_retraction_converter); - + boost::shared_ptr<IntConverter> int_converter(new IntConverter()); converterRepository<std::string>()->registerConverter(int_converter); - + //IPAACA_INFO("Initialization complete.") _initialized = true; //IPAACA_TODO("initialize all converters") @@ -489,11 +489,12 @@ IPAACA_EXPORT boost::shared_ptr<int> CallbackIUCommission::call(const std::strin // OutputBuffer//{{{ -IPAACA_EXPORT OutputBuffer::OutputBuffer(const std::string& basename) +IPAACA_EXPORT OutputBuffer::OutputBuffer(const std::string& basename, const std::string& channel) :Buffer(basename, "OB") { //IPAACA_INFO("Entering ...") _id_prefix = _basename + "-" + _uuid + "-IU-"; + _channel = channel; _initialize_server(); //IPAACA_INFO("... exiting.") } @@ -564,7 +565,7 @@ IPAACA_EXPORT void OutputBuffer::_send_iu_commission(IUInterface* iu, revision_t data->set_revision(revision); if (writer_name=="") data->set_writer_name(_unique_name); else data->set_writer_name(writer_name); - + Informer<AnyType>::Ptr informer = _get_informer(iu->category()); informer->publish(data); } @@ -598,7 +599,7 @@ IPAACA_EXPORT Informer<AnyType>::Ptr OutputBuffer::_get_informer(const std::stri return _informer_store[category]; } else { //IPAACA_INFO("Making new informer for category " << category) - std::string scope_string = "/ipaaca/category/" + category; + std::string scope_string = "/ipaaca/channel/" + _channel + "/category/" + category; Informer<AnyType>::Ptr informer = getFactory().createInformer<AnyType> ( Scope(scope_string)); _informer_store[category] = informer; return informer; @@ -634,78 +635,90 @@ IPAACA_EXPORT void OutputBuffer::_retract_iu(IU::ptr iu) //}}} // InputBuffer//{{{ -IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::set<std::string>& category_interests) +IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::set<std::string>& category_interests, const std::string& channel) :Buffer(basename, "IB") { + _channel = channel; + for (std::set<std::string>::const_iterator it=category_interests.begin(); it!=category_interests.end(); ++it) { _create_category_listener_if_needed(*it); } } -IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests) +IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests, const std::string& channel) :Buffer(basename, "IB") { + _channel = channel; + for (std::vector<std::string>::const_iterator it=category_interests.begin(); it!=category_interests.end(); ++it) { _create_category_listener_if_needed(*it); } } -IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1) -:Buffer(basename, "IB") -{ - _create_category_listener_if_needed(category_interest1); -} -IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2) -:Buffer(basename, "IB") -{ - _create_category_listener_if_needed(category_interest1); - _create_category_listener_if_needed(category_interest2); -} -IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3) +IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& channel) :Buffer(basename, "IB") { - _create_category_listener_if_needed(category_interest1); - _create_category_listener_if_needed(category_interest2); - _create_category_listener_if_needed(category_interest3); -} -IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4) -:Buffer(basename, "IB") -{ - _create_category_listener_if_needed(category_interest1); - _create_category_listener_if_needed(category_interest2); - _create_category_listener_if_needed(category_interest3); - _create_category_listener_if_needed(category_interest4); -} - + _channel = channel; -IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::set<std::string>& category_interests) -{ - Initializer::initialize_ipaaca_rsb_if_needed(); - return InputBuffer::ptr(new InputBuffer(basename, category_interests)); -} -IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::vector<std::string>& category_interests) -{ - Initializer::initialize_ipaaca_rsb_if_needed(); - return InputBuffer::ptr(new InputBuffer(basename, category_interests)); -} -IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1) -{ - Initializer::initialize_ipaaca_rsb_if_needed(); - return InputBuffer::ptr(new InputBuffer(basename, category_interest1)); + _create_category_listener_if_needed(category_interest1); } -IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2) +// IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2) +// :Buffer(basename, "IB") +// { +// _channel = "default"; + +// _create_category_listener_if_needed(category_interest1); +// _create_category_listener_if_needed(category_interest2); +// } +// IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3) +// :Buffer(basename, "IB") +// { +// _channel = "default"; + +// _create_category_listener_if_needed(category_interest1); +// _create_category_listener_if_needed(category_interest2); +// _create_category_listener_if_needed(category_interest3); +// } +// IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4) +// :Buffer(basename, "IB") +// { +// _channel = "default"; + +// _create_category_listener_if_needed(category_interest1); +// _create_category_listener_if_needed(category_interest2); +// _create_category_listener_if_needed(category_interest3); +// _create_category_listener_if_needed(category_interest4); +// } + + +IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::set<std::string>& category_interests, const std::string& channel) { Initializer::initialize_ipaaca_rsb_if_needed(); - return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2)); + return InputBuffer::ptr(new InputBuffer(basename, category_interests, channel)); } -IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3) +IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::vector<std::string>& category_interests, const std::string& channel) { Initializer::initialize_ipaaca_rsb_if_needed(); - return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3)); + return InputBuffer::ptr(new InputBuffer(basename, category_interests, channel)); } -IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4) +IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& channel) { Initializer::initialize_ipaaca_rsb_if_needed(); - return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3, category_interest4)); -} + return InputBuffer::ptr(new InputBuffer(basename, category_interest1, channel)); +} +// IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& channel) +// { +// Initializer::initialize_ipaaca_rsb_if_needed(); +// return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, channel)); +// } +// IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& channel) +// { +// Initializer::initialize_ipaaca_rsb_if_needed(); +// return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3, channel)); +// } +// IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4, const std::string& channel) +// { +// Initializer::initialize_ipaaca_rsb_if_needed(); +// return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3, category_interest4, channel)); +// } IPAACA_EXPORT IUInterface::ptr InputBuffer::get(const std::string& iu_uid) { @@ -739,7 +752,7 @@ IPAACA_EXPORT ListenerPtr InputBuffer::_create_category_listener_if_needed(const return it->second; } //IPAACA_INFO("Creating a new listener for category " << category) - std::string scope_string = "/ipaaca/category/" + category; + std::string scope_string = "/ipaaca/channel/" + _channel + "/category/" + category; ListenerPtr listener = getFactory().createListener( Scope(scope_string) ); //IPAACA_INFO("Adding handler") HandlerPtr event_handler = HandlerPtr( @@ -867,7 +880,7 @@ IPAACA_EXPORT void IUInterface::_set_buffer(Buffer* buffer) { //boost::shared_pt throw IUAlreadyInABufferError(); } _buffer = buffer; - + } IPAACA_EXPORT void IUInterface::_set_owner_name(const std::string& owner_name) { @@ -1035,7 +1048,7 @@ void Message::_internal_commit(const std::string& writer_name) if (is_published()) { IPAACA_INFO("Info: committing to a Message after sending has no global effects") } - + } //}}} @@ -1468,7 +1481,7 @@ IPAACA_EXPORT AnnotatedData IUConverter::deserialize(const std::string& wireSche //return std::make_pair(getDataType(), obj); return std::make_pair("ipaaca::RemoteMessage", obj); break; - } + } default: // other cases not handled yet! ( TODO ) throw NotImplementedError(); diff --git a/ipaacalib/java/src/ipaaca/InputBuffer.java b/ipaacalib/java/src/ipaaca/InputBuffer.java index 1a3bdb4..6017749 100644 --- a/ipaacalib/java/src/ipaaca/InputBuffer.java +++ b/ipaacalib/java/src/ipaaca/InputBuffer.java @@ -1,10 +1,10 @@ /* * This file is part of IPAACA, the * "Incremental Processing Architecture - * for Artificial Conversational Agents". + * for Artificial Conversational Agents". * * Copyright (c) 2009-2013 Sociable Agents Group - * CITEC, Bielefeld University + * CITEC, Bielefeld University * * http://opensource.cit-ec.de/projects/ipaaca/ * http://purl.org/net/ipaaca @@ -21,7 +21,7 @@ * You should have received a copy of the LGPL along with this * program. If not, go to http://www.gnu.org/licenses/lgpl.html * or write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * * The development of this software was supported by the * Excellence Cluster EXC 277 Cognitive Interaction Technology. @@ -69,6 +69,7 @@ public class InputBuffer extends Buffer private final static Logger logger = LoggerFactory.getLogger(InputBuffer.class.getName()); private IUStore<RemotePushIU> iuStore = new IUStore<RemotePushIU>(); private IUStore<RemoteMessageIU> messageStore = new IUStore<RemoteMessageIU>(); + private String channel; public void close() { @@ -121,6 +122,12 @@ public class InputBuffer extends Buffer // for cat in category_interests: // self._create_category_listener_if_needed(cat) public InputBuffer(String owningComponentName, Set<String> categoryInterests) + { + this(owningComponentName, categoryInterests, "default"); + } + + + public InputBuffer(String owningComponentName, Set<String> categoryInterests, String ipaaca_channel) { super(owningComponentName); uniqueName = "/ipaaca/component/" + getUniqueShortName() + "/IB"; @@ -129,6 +136,8 @@ public class InputBuffer extends Buffer { createCategoryListenerIfNeeded(cat); } + + this.channel = ipaaca_channel; } // def _get_remote_server(self, iu): @@ -180,7 +189,7 @@ public class InputBuffer extends Buffer Listener listener; try { - listener = Factory.getInstance().createListener(new Scope("/ipaaca/category/" + category)); + listener = Factory.getInstance().createListener(new Scope("/ipaaca/channel/" + this.channel + "/category/" + category)); } catch (InitializeException e1) { @@ -273,7 +282,7 @@ public class InputBuffer extends Buffer logger.warn("Spurious RemoteMessage event: already got this UID: "+rm.getUid()); return; } - + //logger.info("Adding Message "+rm.getUid()); messageStore.put(rm.getUid(), rm); //logger.info("Calling handlers for Message "+rm.getUid()); diff --git a/ipaacalib/java/src/ipaaca/OutputBuffer.java b/ipaacalib/java/src/ipaaca/OutputBuffer.java index 35bdae6..000b613 100644 --- a/ipaacalib/java/src/ipaaca/OutputBuffer.java +++ b/ipaacalib/java/src/ipaaca/OutputBuffer.java @@ -1,10 +1,10 @@ /* * This file is part of IPAACA, the * "Incremental Processing Architecture - * for Artificial Conversational Agents". + * for Artificial Conversational Agents". * * Copyright (c) 2009-2013 Sociable Agents Group - * CITEC, Bielefeld University + * CITEC, Bielefeld University * * http://opensource.cit-ec.de/projects/ipaaca/ * http://purl.org/net/ipaaca @@ -21,7 +21,7 @@ * You should have received a copy of the LGPL along with this * program. If not, go to http://www.gnu.org/licenses/lgpl.html * or write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * * The development of this software was supported by the * Excellence Cluster EXC 277 Cognitive Interaction Technology. @@ -69,6 +69,7 @@ public class OutputBuffer extends Buffer private Map<String, Informer<Object>> informerStore = new HashMap<String, Informer<Object>>(); // category -> informer map private final static Logger logger = LoggerFactory.getLogger(OutputBuffer.class.getName()); private IUStore<LocalIU> iuStore = new IUStore<LocalIU>(); + private String channel; // def __init__(self, owning_component_name, participant_config=None): // '''Create an Output Buffer. @@ -90,6 +91,16 @@ public class OutputBuffer extends Buffer * @param owningComponentName name of the entity that own this buffer */ public OutputBuffer(String owningComponentName) + { + this(owningComponentName, "default"); + + } + + /** + * @param owningComponentName name of the entity that own this buffer + * @param channel name of the ipaaca channel this buffer is using + */ + public OutputBuffer(String owningComponentName, String ipaaca_channel) { super(owningComponentName); @@ -112,6 +123,7 @@ public class OutputBuffer extends Buffer throw new RuntimeException(e); } + this.channel = ipaaca_channel; } private final class RemoteUpdatePayload extends DataCallback<Integer, IUPayloadUpdate> @@ -192,7 +204,7 @@ public class OutputBuffer extends Buffer { iu.getPayload().remove(k, update.getWriterName()); } - if (update.getNewItemsList().size() > 0) + if (update.getNewItemsList().size() > 0) { HashMap<String, String> payloadUpdate = new HashMap<String, String>(); @@ -329,7 +341,7 @@ public class OutputBuffer extends Buffer Informer<Object> informer; try { - informer = Factory.getInstance().createInformer("/ipaaca/category/" + category); + informer = Factory.getInstance().createInformer("/ipaaca/channel/" + this.channel + "/category/" + category); } catch (InitializeException e1) { @@ -337,7 +349,7 @@ public class OutputBuffer extends Buffer } informerStore.put(category, informer); - logger.info("Added informer on " + category); + logger.info("Added informer on channel " + this.channel + " and category " + category); try { diff --git a/ipaacalib/python/src/ipaaca/__init__.py b/ipaacalib/python/src/ipaaca/__init__.py index 0bd8bbb..e155211 100755 --- a/ipaacalib/python/src/ipaaca/__init__.py +++ b/ipaacalib/python/src/ipaaca/__init__.py @@ -2,10 +2,10 @@ # This file is part of IPAACA, the # "Incremental Processing Architecture -# for Artificial Conversational Agents". +# for Artificial Conversational Agents". # # Copyright (c) 2009-2013 Sociable Agents Group -# CITEC, Bielefeld University +# CITEC, Bielefeld University # # http://opensource.cit-ec.de/projects/ipaaca/ # http://purl.org/net/ipaaca @@ -22,7 +22,7 @@ # You should have received a copy of the LGPL along with this # program. If not, go to http://www.gnu.org/licenses/lgpl.html # or write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # # The development of this software was supported by the # Excellence Cluster EXC 277 Cognitive Interaction Technology. @@ -75,7 +75,7 @@ __all__ = [ def enum(*sequential, **named): """Create an enum type. - + Based on suggestion of Alec Thomas on stackoverflow.com: http://stackoverflow.com/questions/36932/ whats-the-best-way-to-implement-an-enum-in-python/1695250#1695250 @@ -96,12 +96,12 @@ def unpack_typed_payload_item(protobuf_object): class IpaacaLoggingHandler(logging.Handler): - + def __init__(self, level=logging.DEBUG): logging.Handler.__init__(self, level) - + def emit(self, record): - meta = '[ipaaca] (' + str(record.levelname) + ') ' + meta = '[ipaaca] (' + str(record.levelname) + ') ' msg = str(record.msg.format(record.args)) print(meta + msg) @@ -253,13 +253,13 @@ class Payload(dict): #print("Payload.__delitem__() OUT") self._batch_update_lock.release() return r - - # Context-manager based batch updates, not yet thread-safe (on remote updates) + + # Context-manager based batch updates, not yet thread-safe (on remote updates) def __enter__(self): #print('running Payload.__enter__()') self._wait_batch_update_lock(self._update_timeout) self._update_on_every_change = False - + def __exit__(self, type, value, traceback): #print('running Payload.__exit__()') self.iu._modify_payload(is_delta=True, new_items=self._collected_modifications, keys_to_remove=self._collected_removals, writer_name=self._batch_update_writer_name) @@ -291,12 +291,12 @@ class Payload(dict): class IUInterface(object): #{{{ - + """Base class of all specialised IU classes.""" - + def __init__(self, uid, access_mode=IUAccessMode.PUSH, read_only=False): """Creates an IU. - + Keyword arguments: uid -- unique ID of this IU access_mode -- access mode of this IU @@ -314,7 +314,7 @@ class IUInterface(object): #{{{ self._buffer = None # payload is not present here self._links = collections.defaultdict(set) - + def __str__(self): s = unicode(self.__class__)+"{ " s += "category="+("<None>" if self._category is None else self._category)+" " @@ -331,8 +331,8 @@ class IUInterface(object): #{{{ s += "} " s += "}" return s - - + + def _add_and_remove_links(self, add, remove): '''Just add and remove the new links in our links set, do not send an update here''' '''Note: Also used for remotely enforced links updates.''' @@ -343,7 +343,7 @@ class IUInterface(object): #{{{ '''Note: Also used for remotely enforced links updates.''' self._links = collections.defaultdict(set) for type in links.keys(): self._links[type] |= set(links[type]) - + def add_links(self, type, targets, writer_name=None): '''Attempt to add links if the conditions are met and send an update message. Then call the local setter.''' @@ -370,45 +370,45 @@ class IUInterface(object): #{{{ return set(self._links[type]) def get_all_links(self): return copy.deepcopy(self._links) - + def _get_revision(self): return self._revision revision = property(fget=_get_revision, doc='Revision number of the IU.') - + def _get_category(self): return self._category category = property(fget=_get_category, doc='Category of the IU.') - + def _get_payload_type(self): return self._payload_type payload_type = property(fget=_get_payload_type, doc='Type of the IU payload') - + def _get_committed(self): return self._committed committed = property( fget=_get_committed, doc='Flag indicating whether this IU has been committed to.') - + def _get_retracted(self): return self._retracted retracted = property( fget=_get_retracted, doc='Flag indicating whether this IU has been retracted.') - + def _get_uid(self): return self._uid uid = property(fget=_get_uid, doc='Unique ID of the IU.') - + def _get_access_mode(self): return self._access_mode access_mode = property(fget=_get_access_mode, doc='Access mode of the IU.') - + def _get_read_only(self): return self._read_only read_only = property( - fget=_get_read_only, + fget=_get_read_only, doc='Flag indicating whether this IU is read only.') - + def _get_buffer(self): return self._buffer def _set_buffer(self, buffer): @@ -417,9 +417,9 @@ class IUInterface(object): #{{{ self._buffer = buffer buffer = property( fget=_get_buffer, - fset=_set_buffer, + fset=_set_buffer, doc='Buffer this IU is held in.') - + def _get_owner_name(self): return self._owner_name def _set_owner_name(self, owner_name): @@ -440,11 +440,11 @@ class IU(IUInterface):#{{{ super(IU, self).__init__(uid=None, access_mode=access_mode, read_only=read_only) self._revision = 1 self.uid = str(uuid.uuid4()) - self._category = category + self._category = str(category) self._payload_type = _payload_type self.revision_lock = threading.RLock() self._payload = Payload(iu=self) - + def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None): if self.committed: raise IUCommittedError(self) @@ -460,7 +460,7 @@ class IU(IUInterface):#{{{ new_links=new_links, links_to_remove=links_to_remove, writer_name=self.owner_name if writer_name is None else writer_name) - + def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): """Modify the payload: add or remove items from this payload locally and send update.""" if self.committed: @@ -479,10 +479,10 @@ class IU(IUInterface):#{{{ new_items=new_items, keys_to_remove=keys_to_remove, writer_name=self.owner_name if writer_name is None else writer_name) - + def _increase_revision_number(self): self._revision += 1 - + def _internal_commit(self, writer_name=None): if self.committed: raise IUCommittedError(self) @@ -492,11 +492,11 @@ class IU(IUInterface):#{{{ self._committed = True if self.buffer is not None: self.buffer._send_iu_commission(self, writer_name=writer_name) - + def commit(self): """Commit to this IU.""" return self._internal_commit() - + def _get_payload(self): return self._payload def _set_payload(self, new_pl, writer_name=None): @@ -512,13 +512,13 @@ class IU(IUInterface):#{{{ fget=_get_payload, fset=_set_payload, doc='Payload dictionary of this IU.') - + def _get_is_published(self): return self.buffer is not None is_published = property( - fget=_get_is_published, + fget=_get_is_published, doc='Flag indicating whether this IU has been published or not.') - + def _set_buffer(self, buffer): if self._buffer is not None: raise Exception('The IU is already in a buffer, cannot move it.') @@ -529,7 +529,7 @@ class IU(IUInterface):#{{{ fget=IUInterface._get_buffer, fset=_set_buffer, doc='Buffer this IU is held in.') - + def _set_uid(self, uid): if self._uid is not None: raise AttributeError('The uid of IU ' + self.uid + ' has already been set, cannot change it.') @@ -544,26 +544,26 @@ class IU(IUInterface):#{{{ class Message(IU):#{{{ """Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls.""" def __init__(self, category='undef', access_mode=IUAccessMode.MESSAGE, read_only=True, _payload_type='MAP'): - super(Message, self).__init__(category=category, access_mode=access_mode, read_only=read_only, _payload_type=_payload_type) - + super(Message, self).__init__(category=str(category), access_mode=access_mode, read_only=read_only, _payload_type=_payload_type) + def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None): if self.is_published: logger.info('Info: modifying a Message after sending has no global effects') - + def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): if self.is_published: logger.info('Info: modifying a Message after sending has no global effects') - + def _increase_revision_number(self): self._revision += 1 - + def _internal_commit(self, writer_name=None): if self.is_published: logger.info('Info: committing to a Message after sending has no global effects') - + def commit(self): return self._internal_commit() - + def _get_payload(self): return self._payload def _set_payload(self, new_pl, writer_name=None): @@ -582,13 +582,13 @@ class Message(IU):#{{{ fget=_get_payload, fset=_set_payload, doc='Payload dictionary of this IU.') - + def _get_is_published(self): return self.buffer is not None is_published = property( - fget=_get_is_published, + fget=_get_is_published, doc='Flag indicating whether this IU has been published or not.') - + def _set_buffer(self, buffer): if self._buffer is not None: raise Exception('The IU is already in a buffer, cannot move it.') @@ -599,7 +599,7 @@ class Message(IU):#{{{ fget=IUInterface._get_buffer, fset=_set_buffer, doc='Buffer this IU is held in.') - + def _set_uid(self, uid): if self._uid is not None: raise AttributeError('The uid of IU ' + self.uid + ' has already been set, cannot change it.') @@ -611,9 +611,9 @@ class Message(IU):#{{{ #}}} class RemoteMessage(IUInterface):#{{{ - + """A remote IU with access mode 'MESSAGE'.""" - + def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links): super(RemoteMessage, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only) self._revision = revision @@ -627,10 +627,10 @@ class RemoteMessage(IUInterface):#{{{ # We are just receiving it here and applying the new data. self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True) self._links = links - + def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None): logger.info('Info: modifying a RemoteMessage only has local effects') - + def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): logger.info('Info: modifying a RemoteMessage only has local effects') @@ -655,7 +655,7 @@ class RemoteMessage(IUInterface):#{{{ self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove) else: self._replace_links(links=update.new_links) - + def _apply_update(self, update): """Apply a IUPayloadUpdate to the IU.""" logger.warning('Warning: should never be called: RemoteMessage._apply_update') @@ -666,12 +666,12 @@ class RemoteMessage(IUInterface):#{{{ else: # NOTE Please read the comment in the constructor self._payload = Payload(iu=self, new_payload=update.new_items, omit_init_update_message=True) - + def _apply_commission(self): """Apply commission to the IU""" logger.warning('Warning: should never be called: RemoteMessage._apply_commission') self._committed = True - + def _apply_retraction(self): """Apply retraction to the IU""" logger.warning('Warning: should never be called: RemoteMessage._apply_retraction') @@ -679,9 +679,9 @@ class RemoteMessage(IUInterface):#{{{ #}}} class RemotePushIU(IUInterface):#{{{ - + """A remote IU with access mode 'PUSH'.""" - + def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links): super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only) self._revision = revision @@ -695,7 +695,7 @@ class RemotePushIU(IUInterface):#{{{ # We are just receiving it here and applying the new data. self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True) self._links = links - + def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None): """Modify the links: add or remove item from this payload remotely and send update.""" if self.committed: @@ -715,7 +715,7 @@ class RemotePushIU(IUInterface):#{{{ raise IUUpdateFailedError(self) else: self._revision = new_revision - + def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): """Modify the payload: add or remove item from this payload remotely and send update.""" if self.committed: @@ -790,7 +790,7 @@ class RemotePushIU(IUInterface):#{{{ self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove) else: self._replace_links(links=update.new_links) - + def _apply_update(self, update): """Apply a IUPayloadUpdate to the IU.""" self._revision = update.revision @@ -800,11 +800,11 @@ class RemotePushIU(IUInterface):#{{{ else: # NOTE Please read the comment in the constructor self._payload = Payload(iu=self, new_payload=update.new_items, omit_init_update_message=True) - + def _apply_commission(self): """Apply commission to the IU""" self._committed = True - + def _apply_retraction(self): """Apply retraction to the IU""" self._retracted = True @@ -815,12 +815,12 @@ class IntConverter(rsb.converter.Converter):#{{{ """Convert Python int objects to Protobuf ints and vice versa.""" def __init__(self, wireSchema="int", dataType=int): super(IntConverter, self).__init__(bytearray, dataType, wireSchema) - + def serialize(self, value): pbo = ipaaca_pb2.IntMessage() pbo.value = value return bytearray(pbo.SerializeToString()), self.wireSchema - + def deserialize(self, byte_stream, ws): pbo = ipaaca_pb2.IntMessage() pbo.ParseFromString( str(byte_stream) ) @@ -835,7 +835,7 @@ class IUConverter(rsb.converter.Converter):#{{{ ''' def __init__(self, wireSchema="ipaaca-iu", dataType=IU): super(IUConverter, self).__init__(bytearray, dataType, wireSchema) - + def serialize(self, iu): pbo = ipaaca_pb2.IU() pbo.uid = iu._uid @@ -859,7 +859,7 @@ class IUConverter(rsb.converter.Converter):#{{{ linkset.targets.extend(iu._links[type_]) ws = "ipaaca-messageiu" if iu._access_mode == IUAccessMode.MESSAGE else self.wireSchema return bytearray(pbo.SerializeToString()), ws - + def deserialize(self, byte_stream, ws): type = self.getDataType() #print('IUConverter.deserialize got a '+str(type)+' over wireSchema '+ws) @@ -921,7 +921,7 @@ class MessageConverter(rsb.converter.Converter):#{{{ ''' def __init__(self, wireSchema="ipaaca-messageiu", dataType=Message): super(IUConverter, self).__init__(bytearray, dataType, wireSchema) - + def serialize(self, iu): pbo = ipaaca_pb2.IU() pbo.uid = iu._uid @@ -945,7 +945,7 @@ class MessageConverter(rsb.converter.Converter):#{{{ linkset.targets.extend(iu._links[type_]) ws = "ipaaca-messageiu" if iu._access_mode == IUAccessMode.MESSAGE else self.wireSchema return bytearray(pbo.SerializeToString()), ws - + def deserialize(self, byte_stream, ws): type = self.getDataType() #print('MessageConverter.deserialize got a '+str(type)+' over wireSchema '+ws) @@ -1002,7 +1002,7 @@ class MessageConverter(rsb.converter.Converter):#{{{ class IULinkUpdate(object):#{{{ - + def __init__(self, uid, revision, is_delta, writer_name="undef", new_links=None, links_to_remove=None): super(IULinkUpdate, self).__init__() self.uid = uid @@ -1011,7 +1011,7 @@ class IULinkUpdate(object):#{{{ self.is_delta = is_delta self.new_links = collections.defaultdict(set) if new_links is None else collections.defaultdict(set, new_links) self.links_to_remove = collections.defaultdict(set) if links_to_remove is None else collections.defaultdict(set, links_to_remove) - + def __str__(self): s = 'LinkUpdate(' + 'uid=' + self.uid + ', ' s += 'revision='+str(self.revision)+', ' @@ -1023,7 +1023,7 @@ class IULinkUpdate(object):#{{{ #}}} class IUPayloadUpdate(object):#{{{ - + def __init__(self, uid, revision, is_delta, writer_name="undef", new_items=None, keys_to_remove=None): super(IUPayloadUpdate, self).__init__() self.uid = uid @@ -1032,7 +1032,7 @@ class IUPayloadUpdate(object):#{{{ self.is_delta = is_delta self.new_items = {} if new_items is None else new_items self.keys_to_remove = [] if keys_to_remove is None else keys_to_remove - + def __str__(self): s = 'PayloadUpdate(' + 'uid=' + self.uid + ', ' s += 'revision='+str(self.revision)+', ' @@ -1046,7 +1046,7 @@ class IUPayloadUpdate(object):#{{{ class IULinkUpdateConverter(rsb.converter.Converter):#{{{ def __init__(self, wireSchema="ipaaca-iu-link-update", dataType=IULinkUpdate): super(IULinkUpdateConverter, self).__init__(bytearray, dataType, wireSchema) - + def serialize(self, iu_link_update): pbo = ipaaca_pb2.IULinkUpdate() pbo.uid = iu_link_update.uid @@ -1062,7 +1062,7 @@ class IULinkUpdateConverter(rsb.converter.Converter):#{{{ linkset.targets.extend(iu_link_update.links_to_remove[type_]) pbo.is_delta = iu_link_update.is_delta return bytearray(pbo.SerializeToString()), self.wireSchema - + def deserialize(self, byte_stream, ws): type = self.getDataType() if type == IULinkUpdate: @@ -1082,7 +1082,7 @@ class IULinkUpdateConverter(rsb.converter.Converter):#{{{ class IUPayloadUpdateConverter(rsb.converter.Converter):#{{{ def __init__(self, wireSchema="ipaaca-iu-payload-update", dataType=IUPayloadUpdate): super(IUPayloadUpdateConverter, self).__init__(bytearray, dataType, wireSchema) - + def serialize(self, iu_payload_update): pbo = ipaaca_pb2.IUPayloadUpdate() pbo.uid = iu_payload_update.uid @@ -1094,7 +1094,7 @@ class IUPayloadUpdateConverter(rsb.converter.Converter):#{{{ pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove) pbo.is_delta = iu_payload_update.is_delta return bytearray(pbo.SerializeToString()), self.wireSchema - + def deserialize(self, byte_stream, ws): type = self.getDataType() if type == IUPayloadUpdate: @@ -1128,12 +1128,12 @@ class FrozenIUStore(IUStore): raise AttributeError() class IUEventHandler(object): - + """Wrapper for IU event handling functions.""" - + def __init__(self, handler_function, for_event_types=None, for_categories=None): """Create an IUEventHandler. - + Keyword arguments: handler_function -- the handler function with the signature (IU, event_type, local) @@ -1150,10 +1150,10 @@ class IUEventHandler(object): self._for_categories = ( None if for_categories is None else (for_categories[:] if hasattr(for_categories, '__iter__') else [for_categories])) - + def condition_met(self, event_type, category): """Check whether this IUEventHandler should be called. - + Keyword arguments: event_type -- type of the IU event category -- category of the IU which triggered the event @@ -1161,10 +1161,10 @@ class IUEventHandler(object): type_condition_met = (self._for_event_types is None or event_type in self._for_event_types) cat_condition_met = (self._for_categories is None or category in self._for_categories) return type_condition_met and cat_condition_met - + def call(self, buffer, iu_uid, local, event_type, category): """Call this IUEventHandler's function, if it applies. - + Keyword arguments: buffer -- the buffer in which the IU is stored iu_uid -- the uid of the IU @@ -1178,12 +1178,12 @@ class IUEventHandler(object): class Buffer(object): - + """Base class for InputBuffer and OutputBuffer.""" - + def __init__(self, owning_component_name, participant_config=None): '''Create a Buffer. - + Keyword arguments: owning_compontent_name -- name of the entity that owns this Buffer participant_config -- RSB configuration @@ -1196,49 +1196,49 @@ class Buffer(object): self._unique_name = "undef-"+self._uuid self._iu_store = IUStore() self._iu_event_handlers = [] - + def _get_frozen_iu_store(self): return FrozenIUStore(original_iu_store = self._iu_store) iu_store = property(fget=_get_frozen_iu_store, doc='Copy-on-read version of the internal IU store') - + def register_handler(self, handler_function, for_event_types=None, for_categories=None): """Register a new IU event handler function. - + Keyword arguments: handler_function -- a function with the signature (IU, event_type, local) for_event_types -- a list of event types or None if handler should be called for all event types for_categories -- a list of category names or None if handler should be called for all categories - + """ handler = IUEventHandler(handler_function=handler_function, for_event_types=for_event_types, for_categories=for_categories) self._iu_event_handlers.append(handler) - + def call_iu_event_handlers(self, uid, local, event_type, category): """Call registered IU event handler functions registered for this event_type and category.""" for h in self._iu_event_handlers: h.call(self, uid, local=local, event_type=event_type, category=category) - + def _get_owning_component_name(self): """Return the name of this Buffer's owning component""" return self._owning_component_name owning_component_name = property(_get_owning_component_name) - + def _get_unique_name(self): """Return the Buffer's unique name.""" return self._unique_name unique_name = property(_get_unique_name) - + class InputBuffer(Buffer): - + """An InputBuffer that holds remote IUs.""" - - def __init__(self, owning_component_name, category_interests=None, participant_config=None): + + def __init__(self, owning_component_name, category_interests=None, channel="default", participant_config=None): '''Create an InputBuffer. - + Keyword arguments: owning_compontent_name -- name of the entity that owns this InputBuffer category_interests -- list of IU categories this Buffer is interested in @@ -1248,11 +1248,12 @@ class InputBuffer(Buffer): self._unique_name = '/ipaaca/component/'+str(owning_component_name)+'ID'+self._uuid+'/IB' self._listener_store = {} # one per IU category self._remote_server_store = {} # one per remote-IU-owning Component + self._channel = channel self._category_interests = [] if category_interests is not None: for cat in category_interests: self._add_category_listener(cat) - + def _get_remote_server(self, iu): '''Return (or create, store and return) a remote server.''' if iu.owner_name in self._remote_server_store: @@ -1261,22 +1262,22 @@ class InputBuffer(Buffer): remote_server = rsb.createRemoteServer(rsb.Scope(str(iu.owner_name))) self._remote_server_store[iu.owner_name] = remote_server return remote_server - + def _add_category_listener(self, iu_category): - '''Return (or create, store and return) a category listener.''' - if iu_category not in self._listener_store: - cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config) + '''Return (or create, store and return) a category listener on a specific channel.''' + if iu_category not in self._listener_store: + cat_listener = rsb.createListener(rsb.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)), config=self._participant_config) cat_listener.addHandler(self._handle_iu_events) self._listener_store[iu_category] = cat_listener self._category_interests.append(iu_category) - logger.info("Added listener in scope "+"/ipaaca/category/"+iu_category) - + logger.info("Added listener in scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+iu_category) + def _handle_iu_events(self, event): '''Dispatch incoming IU events. - + Adds incoming IU's to the store, applies payload and commit updates to IU, calls IU event handlers.' - + Keyword arguments: event -- a converted RSB event ''' @@ -1319,7 +1320,7 @@ class InputBuffer(Buffer): return #else: # print('Got update written by buffer '+str(event.data.writer_name)) - + if type_ is ipaaca_pb2.IUCommission: # IU commit iu = self._iu_store[event.data.uid] @@ -1345,12 +1346,12 @@ class InputBuffer(Buffer): class OutputBuffer(Buffer): - + """An OutputBuffer that holds local IUs.""" - - def __init__(self, owning_component_name, participant_config=None): + + def __init__(self, owning_component_name, channel='default', participant_config=None): '''Create an Output Buffer. - + Keyword arguments: owning_component_name -- name of the entity that own this buffer participant_config -- RSB configuration @@ -1365,7 +1366,8 @@ class OutputBuffer(Buffer): self._id_prefix = str(owning_component_name)+'-'+str(self._uuid)+'-IU-' self.__iu_id_counter_lock = threading.Lock() #self.__iu_id_counter = 0 # hbuschme: IUs now have their Ids assigned on creation - + self._channel = channel + def _create_own_name_listener(self, iu_category): # FIXME replace this '''Create an own name listener.''' @@ -1377,7 +1379,7 @@ class OutputBuffer(Buffer): #logger.info("Added category listener for "+iu_category) #return cat_listener pass - + # hbuschme: IUs now have their Ids assigned on creation #def _generate_iu_uid(self): # '''Generate a unique IU id of the form ????''' @@ -1403,7 +1405,7 @@ class OutputBuffer(Buffer): iu.set_links(links=update.new_links, writer_name=update.writer_name) self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.LINKSUPDATED, category=iu.category) return iu.revision - + def _remote_update_payload(self, update): '''Apply a remotely requested update to one of the stored IU's payload.''' if update.uid not in self._iu_store: @@ -1431,7 +1433,7 @@ class OutputBuffer(Buffer): # _set_payload etc. have also incremented the revision number self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category) return iu.revision - + def _remote_commit(self, iu_commission): '''Apply a remotely requested commit to one of the stored IUs.''' if iu_commission.uid not in self._iu_store: @@ -1449,20 +1451,20 @@ class OutputBuffer(Buffer): iu._internal_commit(writer_name=iu_commission.writer_name) self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=IUEventType.COMMITTED, category=iu.category) return iu.revision - + def _get_informer(self, iu_category): '''Return (or create, store and return) an informer object for IUs of the specified category.''' if iu_category in self._informer_store: - logger.info("Returning informer on scope "+"/ipaaca/category/"+str(iu_category)) + logger.info("Returning informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)) return self._informer_store[iu_category] informer_iu = rsb.createInformer( - rsb.Scope("/ipaaca/category/"+str(iu_category)), + rsb.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)), config=self._participant_config, dataType=object) self._informer_store[iu_category] = informer_iu #new_tuple - logger.info("Returning NEW informer on scope "+"/ipaaca/category/"+str(iu_category)) + logger.info("Returning NEW informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)) return informer_iu #return new_tuple - + def add(self, iu): '''Add an IU to the IU store, assign an ID and publish it.''' # hbuschme: IUs now have their Ids assigned on creation @@ -1478,7 +1480,7 @@ class OutputBuffer(Buffer): self._iu_store[iu.uid] = iu iu.buffer = self self._publish_iu(iu) - + def remove(self, iu=None, iu_uid=None): '''Remove the iu or an IU corresponding to iu_uid from the OutputBuffer, retracting it from the system.''' if iu is None: @@ -1492,12 +1494,12 @@ class OutputBuffer(Buffer): self._retract_iu(iu) del self._iu_store[iu.uid] return iu - + def _publish_iu(self, iu): '''Publish an IU.''' informer = self._get_informer(iu._category) informer.publishData(iu) - + def _retract_iu(self, iu): '''Retract (unpublish) an IU.''' iu_retraction = ipaaca_pb2.IURetraction() @@ -1505,10 +1507,10 @@ class OutputBuffer(Buffer): iu_retraction.revision = iu.revision informer = self._get_informer(iu._category) informer.publishData(iu_retraction) - + def _send_iu_commission(self, iu, writer_name): '''Send IU commission. - + Keyword arguments: iu -- the IU that has been committed to writer_name -- name of the Buffer that initiated this commit, necessary @@ -1523,10 +1525,10 @@ class OutputBuffer(Buffer): iu_commission.writer_name = iu.owner_name if writer_name is None else writer_name informer = self._get_informer(iu._category) informer.publishData(iu_commission) - + def _send_iu_link_update(self, iu, is_delta, revision, new_links=None, links_to_remove=None, writer_name="undef"): '''Send an IU link update. - + Keyword arguments: iu -- the IU being updated is_delta -- whether this is an incremental update or a replacement @@ -1550,10 +1552,10 @@ class OutputBuffer(Buffer): informer = self._get_informer(iu._category) informer.publishData(link_update) # FIXME send the notification to the target, if the target is not the writer_name - + def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"): '''Send an IU payload update. - + Keyword arguments: iu -- the IU being updated is_delta -- whether this is an incremental update or a replacement -- GitLab