/* * This file is part of IPAACA, the * "Incremental Processing Architecture * for Artificial Conversational Agents". * * Copyright (c) 2009-2015 Social Cognitive Systems Group * (formerly the Sociable Agents Group) * CITEC, Bielefeld University * * http://opensource.cit-ec.de/projects/ipaaca/ * http://purl.org/net/ipaaca * * This file may be licensed under the terms of of the * GNU Lesser General Public License Version 3 (the ``LGPL''), * or (at your option) any later version. * * Software distributed under the License is distributed * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either * express or implied. See the LGPL for the specific language * governing rights and limitations. * * You should have received a copy of the LGPL along with this * program. If not, go to http://www.gnu.org/licenses/lgpl.html * or write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * * The development of this software was supported by the * Excellence Cluster EXC 277 Cognitive Interaction Technology. * The Excellence Cluster EXC 277 is a grant of the Deutsche * Forschungsgemeinschaft (DFG) in the context of the German * Excellence Initiative. */ #include <ipaaca/ipaaca.h> #define VERBOSE_HANDLERS 0 // remove later namespace ipaaca { using namespace rsb; using namespace rsb::filter; using namespace rsb::converter; using namespace rsb::patterns; IPAACA_EXPORT std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj)//{{{ { os << "PayloadUpdate(uid=" << obj.uid << ", revision=" << obj.revision; os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False"); os << ", new_items = {"; bool first = true; for (auto& newit: obj.new_items) { if (first) { first=false; } else { os << ", "; } //os << "'" << it->first << "':'" << it->second << "'"; os << "'" << newit.first << "': " << newit.second; } os << "}, keys_to_remove = ["; first = true; for (std::vector<std::string>::const_iterator it=obj.keys_to_remove.begin(); it!=obj.keys_to_remove.end(); ++it) { if (first) { first=false; } else { os << ", "; } os << "'" << *it << "'"; } os << "])"; return os; } //}}} IPAACA_EXPORT std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj)//{{{ { os << "LinkUpdate(uid=" << obj.uid << ", revision=" << obj.revision; os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False"); os << ", new_links = {"; bool first = true; for (std::map<std::string, std::set<std::string> >::const_iterator it=obj.new_links.begin(); it!=obj.new_links.end(); ++it) { if (first) { first=false; } else { os << ", "; } os << "'" << it->first << "': ["; bool ffirst = true; for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) { if (ffirst) { ffirst=false; } else { os << ", "; } os << "'" << *it2 << "'"; } os << "]"; } os << "}, links_to_remove = {"; first = true; for (std::map<std::string, std::set<std::string> >::const_iterator it=obj.links_to_remove.begin(); it!=obj.links_to_remove.end(); ++it) { if (first) { first=false; } else { os << ", "; } os << "'" << it->first << "': ["; bool ffirst = true; for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) { if (ffirst) { ffirst=false; } else { os << ", "; } os << "'" << *it2 << "'"; } os << "]"; } os << "})"; return os; } //}}} // IUEventHandler//{{{ IPAACA_EXPORT IUEventHandler::IUEventHandler(IUEventHandlerFunction function, IUEventType event_mask, const std::string& category) : _function(function), _event_mask(event_mask), _for_all_categories(false) { if (category=="") { _for_all_categories = true; } else { _categories.insert(category); } } IPAACA_EXPORT IUEventHandler::IUEventHandler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories) : _function(function), _event_mask(event_mask), _for_all_categories(false) { if (categories.size()==0) { _for_all_categories = true; } else { _categories = categories; } } IPAACA_EXPORT void IUEventHandler::call(Buffer* buffer, boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category) { if (_condition_met(event_type, category)) { #if VERBOSE_HANDLERS == 1 std::cout << "[" << pthread_self() << " handler ENTER]" << std::endl; #endif _function(iu, event_type, local); #if VERBOSE_HANDLERS == 1 std::cout << "[" << pthread_self() << " handler EXIT]" << std::endl; #endif } } //}}} // Buffer//{{{ IPAACA_EXPORT void Buffer::_allocate_unique_name(const std::string& basename, const std::string& function) { std::string uuid = ipaaca::generate_uuid_string(); _basename = basename; _uuid = uuid.substr(0,8); _unique_name = "/ipaaca/component/" + _basename + "ID" + _uuid + "/" + function; } IPAACA_EXPORT void Buffer::register_handler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories) { IPAACA_DEBUG("register_handler " << function << " " << event_mask << " " << categories) IUEventHandler::ptr handler = IUEventHandler::ptr(new IUEventHandler(function, event_mask, categories)); _event_handlers.push_back(handler); } IPAACA_EXPORT void Buffer::register_handler(IUEventHandlerFunction function, IUEventType event_mask, const std::string& category) { IPAACA_DEBUG("register_handler " << function << " " << event_mask << " " << category) IUEventHandler::ptr handler = IUEventHandler::ptr(new IUEventHandler(function, event_mask, category)); _event_handlers.push_back(handler); } IPAACA_EXPORT void Buffer::call_iu_event_handlers(boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category) { //IPAACA_DEBUG("handling an event " << ipaaca::iu_event_type_to_str(event_type) << " for IU " << iu->uid()) for (std::vector<IUEventHandler::ptr>::iterator it = _event_handlers.begin(); it != _event_handlers.end(); ++it) { (*it)->call(this, iu, local, event_type, category); } } //}}} // Callbacks for OutputBuffer//{{{ IPAACA_EXPORT CallbackIUPayloadUpdate::CallbackIUPayloadUpdate(Buffer* buffer): _buffer(buffer) { } IPAACA_EXPORT CallbackIULinkUpdate::CallbackIULinkUpdate(Buffer* buffer): _buffer(buffer) { } IPAACA_EXPORT CallbackIUCommission::CallbackIUCommission(Buffer* buffer): _buffer(buffer) { } IPAACA_EXPORT CallbackIUResendRequest::CallbackIUResendRequest(Buffer* buffer): _buffer(buffer) { } IPAACA_EXPORT boost::shared_ptr<int> CallbackIUPayloadUpdate::call(const std::string& methodName, boost::shared_ptr<IUPayloadUpdate> update) { IUInterface::ptr iui = _buffer->get(update->uid); if (! iui) { IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid) return boost::shared_ptr<int>(new int(0)); } IU::ptr iu = boost::static_pointer_cast<IU>(iui); iu->_revision_lock.lock(); if ((update->revision != 0) && (update->revision != iu->_revision)) { IPAACA_WARNING("Remote write operation failed because request was out of date; IU " << update->uid) IPAACA_WARNING(" Referred-to revision was " << update->revision << " while local one is " << iu->_revision) iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } else if (iu->committed()) { iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } else if (iu->retracted()) { iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } else if (iu->committed()) { iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } else if (iu->retracted()) { iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } if (update->is_delta) { // FIXME TODO this is an unsolved problem atm: deletions in a delta update are // sent individually. We should have something like _internal_merge_and_remove for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) { iu->payload()._internal_remove(*it, update->writer_name); //_buffer->unique_name()); } // but it is solved for pure merges: iu->payload()._internal_merge(update->new_items, update->writer_name); } else { iu->payload()._internal_replace_all(update->new_items, update->writer_name); //_buffer->unique_name()); } _buffer->call_iu_event_handlers(iu, true, IU_UPDATED, iu->category()); revision_t revision = iu->revision(); iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(revision)); } IPAACA_EXPORT boost::shared_ptr<int> CallbackIULinkUpdate::call(const std::string& methodName, boost::shared_ptr<IULinkUpdate> update) { IUInterface::ptr iui = _buffer->get(update->uid); if (! iui) { IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid) return boost::shared_ptr<int>(new int(0)); } IU::ptr iu = boost::static_pointer_cast<IU>(iui); iu->_revision_lock.lock(); if ((update->revision != 0) && (update->revision != iu->_revision)) { IPAACA_WARNING("Remote write operation failed because request was out of date; IU " << update->uid) iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } else if (iu->committed()) { iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } else if (iu->retracted()) { iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } else if (iu->committed()) { iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } else if (iu->retracted()) { iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } if (update->is_delta) { iu->modify_links(update->new_links, update->links_to_remove, update->writer_name); } else { iu->set_links(update->new_links, update->writer_name); } _buffer->call_iu_event_handlers(iu, true, IU_LINKSUPDATED, iu->category()); revision_t revision = iu->revision(); iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(revision)); } IPAACA_EXPORT boost::shared_ptr<int> CallbackIUCommission::call(const std::string& methodName, boost::shared_ptr<protobuf::IUCommission> update) { IUInterface::ptr iui = _buffer->get(update->uid()); if (! iui) { IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid()) return boost::shared_ptr<int>(new int(0)); } IU::ptr iu = boost::static_pointer_cast<IU>(iui); iu->_revision_lock.lock(); if ((update->revision() != 0) && (update->revision() != iu->_revision)) { IPAACA_WARNING("Remote write operation failed because request was out of date; IU " << update->uid()) iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } else if (iu->committed()) { iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } else if (iu->retracted()) { iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } else { } iu->_internal_commit(update->writer_name()); _buffer->call_iu_event_handlers(iu, true, IU_LINKSUPDATED, iu->category()); revision_t revision = iu->revision(); iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(revision)); } IPAACA_EXPORT boost::shared_ptr<int> CallbackIUResendRequest::call(const std::string& methodName, boost::shared_ptr<protobuf::IUResendRequest> update) { IUInterface::ptr iui = _buffer->get(update->uid()); if (! iui) { IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid()) return boost::shared_ptr<int>(new int(0)); } IU::ptr iu = boost::static_pointer_cast<IU>(iui); if ((update->has_hidden_scope_name() == true)&&(update->hidden_scope_name().compare("") != 0)){ revision_t revision = iu->revision(); _buffer->_publish_iu_resend(iu, update->hidden_scope_name()); return boost::shared_ptr<int>(new int(revision)); } else { revision_t revision = 0; return boost::shared_ptr<int>(new int(revision)); } } //}}} // OutputBuffer//{{{ IPAACA_EXPORT OutputBuffer::OutputBuffer(const std::string& basename, const std::string& channel) :Buffer(basename, "OB") { _id_prefix = _basename + "-" + _uuid + "-IU-"; _channel = (channel=="") ? __ipaaca_static_option_default_channel: channel; _initialize_server(); } IPAACA_EXPORT void OutputBuffer::_initialize_server() { _server = getFactory().createLocalServer( Scope( _unique_name ) ); _server->registerMethod("updatePayload", Server::CallbackPtr(new CallbackIUPayloadUpdate(this))); _server->registerMethod("updateLinks", Server::CallbackPtr(new CallbackIULinkUpdate(this))); _server->registerMethod("commit", Server::CallbackPtr(new CallbackIUCommission(this))); _server->registerMethod("resendRequest", Server::CallbackPtr(new CallbackIUResendRequest(this))); } IPAACA_EXPORT OutputBuffer::ptr OutputBuffer::create(const std::string& basename) { Initializer::initialize_backend(); return OutputBuffer::ptr(new OutputBuffer(basename)); } IPAACA_EXPORT IUInterface::ptr OutputBuffer::get(const std::string& iu_uid) { IUStore::iterator it = _iu_store.find(iu_uid); if (it==_iu_store.end()) return IUInterface::ptr(); return it->second; } IPAACA_EXPORT std::set<IUInterface::ptr> OutputBuffer::get_ius() { std::set<IUInterface::ptr> set; for (IUStore::iterator it=_iu_store.begin(); it!=_iu_store.end(); ++it) set.insert(it->second); return set; } IPAACA_EXPORT void OutputBuffer::_send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) { IULinkUpdate* lup = new ipaaca::IULinkUpdate(); Informer<ipaaca::IULinkUpdate>::DataPtr ldata(lup); lup->uid = iu->uid(); lup->is_delta = is_delta; lup->revision = revision; lup->is_delta = true; lup->new_links = new_links; if (is_delta) lup->links_to_remove = links_to_remove; if (writer_name=="") lup->writer_name = _unique_name; else lup->writer_name = writer_name; Informer<AnyType>::Ptr informer = _get_informer(iu->category()); informer->publish(ldata); } IPAACA_EXPORT void OutputBuffer::_send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, PayloadDocumentEntry::ptr>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) { IUPayloadUpdate* pup = new ipaaca::IUPayloadUpdate(); Informer<ipaaca::IUPayloadUpdate>::DataPtr pdata(pup); pup->payload_type = iu->payload_type(); pup->uid = iu->uid(); pup->is_delta = is_delta; pup->revision = revision; pup->new_items = new_items; if (is_delta) pup->keys_to_remove = keys_to_remove; if (writer_name=="") pup->writer_name = _unique_name; else pup->writer_name = writer_name; Informer<AnyType>::Ptr informer = _get_informer(iu->category()); informer->publish(pdata); } IPAACA_EXPORT void OutputBuffer::_send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name) { Informer<protobuf::IUCommission>::DataPtr data(new protobuf::IUCommission()); data->set_uid(iu->uid()); data->set_revision(revision); if (writer_name=="") data->set_writer_name(_unique_name); else data->set_writer_name(writer_name); Informer<AnyType>::Ptr informer = _get_informer(iu->category()); informer->publish(data); } IPAACA_EXPORT void OutputBuffer::add(IU::ptr iu) { if (_iu_store.count(iu->uid()) > 0) { throw IUPublishedError(); } if (iu->is_published()) { throw IUPublishedError(); } else if (iu->retracted()) { throw IURetractedError(); } if (iu->access_mode() != IU_ACCESS_MESSAGE) { // (for Message-type IUs: do not actually store them) _iu_store[iu->uid()] = iu; } iu->_associate_with_buffer(this); _publish_iu(iu); } IPAACA_EXPORT void OutputBuffer::_publish_iu(IU::ptr iu) { Informer<AnyType>::Ptr informer = _get_informer(iu->_category); Informer<ipaaca::IU>::DataPtr iu_data(iu); informer->publish(iu_data); } IPAACA_EXPORT void OutputBuffer::_publish_iu_resend(IU::ptr iu, const std::string& hidden_scope_name) { Informer<AnyType>::Ptr informer = _get_informer(hidden_scope_name); Informer<ipaaca::IU>::DataPtr iu_data(iu); informer->publish(iu_data); } IPAACA_EXPORT Informer<AnyType>::Ptr OutputBuffer::_get_informer(const std::string& category) { if (_informer_store.count(category) > 0) { return _informer_store[category]; } else { //IPAACA_INFO("Creating new informer for category " << category << " on channel " << _channel) std::string scope_string = "/ipaaca/channel/" + _channel + "/category/" + category; IPAACA_INFO("Creating new informer for " << scope_string) Informer<AnyType>::Ptr informer = getFactory().createInformer<AnyType> ( Scope(scope_string)); _informer_store[category] = informer; return informer; } } IPAACA_EXPORT boost::shared_ptr<IU> OutputBuffer::remove(const std::string& iu_uid) { IUStore::iterator it = _iu_store.find(iu_uid); if (it == _iu_store.end()) { IPAACA_WARNING("Removal of IU " << iu_uid << " requested, but not present in our OutputBuffer") //throw IUNotFoundError(); } IU::ptr iu = it->second; _retract_iu(iu); _iu_store.erase(iu_uid); return iu; } IPAACA_EXPORT boost::shared_ptr<IU> OutputBuffer::remove(IU::ptr iu) { return remove(iu->uid()); // to make sure it is in the store } IPAACA_EXPORT void OutputBuffer::_retract_iu(IU::ptr iu) { if (iu->_retracted) return; // ignore subsequent retractions iu->_retracted = true; Informer<protobuf::IURetraction>::DataPtr data(new protobuf::IURetraction()); data->set_uid(iu->uid()); data->set_revision(iu->revision()); Informer<AnyType>::Ptr informer = _get_informer(iu->category()); informer->publish(data); } IPAACA_EXPORT void OutputBuffer::_retract_all_internal() { for (IUStore::iterator it=_iu_store.begin(); it!=_iu_store.end(); ++it) { if (!(it->second->_retracted)) { _retract_iu(it->second); } } } IPAACA_EXPORT OutputBuffer::~OutputBuffer() { _retract_all_internal(); } //}}} // InputBuffer//{{{ IPAACA_EXPORT InputBuffer::InputBuffer(const BufferConfiguration& bufferconfiguration) :Buffer(bufferconfiguration.get_basename(), "IB") { _channel = bufferconfiguration.get_channel(); for (std::vector<std::string>::const_iterator it=bufferconfiguration.get_category_interests().begin(); it!=bufferconfiguration.get_category_interests().end(); ++it) { _create_category_listener_if_needed(*it); } _create_category_listener_if_needed(_uuid); triggerResend = false; } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::set<std::string>& category_interests) :Buffer(basename, "IB") { _channel = __ipaaca_static_option_default_channel; for (std::set<std::string>::const_iterator it=category_interests.begin(); it!=category_interests.end(); ++it) { _create_category_listener_if_needed(*it); } _create_category_listener_if_needed(_uuid); triggerResend = false; } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests) :Buffer(basename, "IB") { _channel = __ipaaca_static_option_default_channel; for (std::vector<std::string>::const_iterator it=category_interests.begin(); it!=category_interests.end(); ++it) { _create_category_listener_if_needed(*it); } _create_category_listener_if_needed(_uuid); triggerResend = false; } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1) :Buffer(basename, "IB") { _channel = __ipaaca_static_option_default_channel; _create_category_listener_if_needed(category_interest1); _create_category_listener_if_needed(_uuid); triggerResend = false; } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2) :Buffer(basename, "IB") { _channel = __ipaaca_static_option_default_channel; _create_category_listener_if_needed(category_interest1); _create_category_listener_if_needed(category_interest2); _create_category_listener_if_needed(_uuid); triggerResend = false; } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3) :Buffer(basename, "IB") { _channel = __ipaaca_static_option_default_channel; _create_category_listener_if_needed(category_interest1); _create_category_listener_if_needed(category_interest2); _create_category_listener_if_needed(category_interest3); _create_category_listener_if_needed(_uuid); triggerResend = false; } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4) :Buffer(basename, "IB") { _channel = __ipaaca_static_option_default_channel; _create_category_listener_if_needed(category_interest1); _create_category_listener_if_needed(category_interest2); _create_category_listener_if_needed(category_interest3); _create_category_listener_if_needed(category_interest4); _create_category_listener_if_needed(_uuid); triggerResend = false; } IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const BufferConfiguration& bufferconfiguration) { Initializer::initialize_backend(); return InputBuffer::ptr(new InputBuffer(bufferconfiguration)); } IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::set<std::string>& category_interests) { Initializer::initialize_backend(); return InputBuffer::ptr(new InputBuffer(basename, category_interests)); } IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::vector<std::string>& category_interests) { Initializer::initialize_backend(); return InputBuffer::ptr(new InputBuffer(basename, category_interests)); } IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1) { Initializer::initialize_backend(); return InputBuffer::ptr(new InputBuffer(basename, category_interest1)); } IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2) { Initializer::initialize_backend(); return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2)); } IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3) { Initializer::initialize_backend(); return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3)); } IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4) { Initializer::initialize_backend(); return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3, category_interest4)); } IPAACA_EXPORT void InputBuffer::set_resend(bool resendActive) { triggerResend = resendActive; } IPAACA_EXPORT bool InputBuffer::get_resend() { return triggerResend; } IPAACA_EXPORT IUInterface::ptr InputBuffer::get(const std::string& iu_uid) { RemotePushIUStore::iterator it = _iu_store.find(iu_uid); if (it==_iu_store.end()) return IUInterface::ptr(); return it->second; } IPAACA_EXPORT std::set<IUInterface::ptr> InputBuffer::get_ius() { std::set<IUInterface::ptr> set; for (RemotePushIUStore::iterator it=_iu_store.begin(); it!=_iu_store.end(); ++it) set.insert(it->second); return set; } IPAACA_EXPORT RemoteServerPtr InputBuffer::_get_remote_server(const std::string& unique_server_name) { std::map<std::string, RemoteServerPtr>::iterator it = _remote_server_store.find(unique_server_name); if (it!=_remote_server_store.end()) return it->second; RemoteServerPtr remote_server = getFactory().createRemoteServer(Scope(unique_server_name)); _remote_server_store[unique_server_name] = remote_server; return remote_server; } IPAACA_EXPORT ListenerPtr InputBuffer::_create_category_listener_if_needed(const std::string& category) { std::map<std::string, ListenerPtr>::iterator it = _listener_store.find(category); if (it!=_listener_store.end()) { return it->second; } std::string scope_string = "/ipaaca/channel/" + _channel + "/category/" + category; IPAACA_INFO("Creating new listener for " << scope_string) ListenerPtr listener = getFactory().createListener( Scope(scope_string) ); HandlerPtr event_handler = HandlerPtr( new EventFunctionHandler( boost::bind(&InputBuffer::_handle_iu_events, this, _1) ) ); listener->addHandler(event_handler); _listener_store[category] = listener; return listener; } IPAACA_EXPORT void InputBuffer::_trigger_resend_request(EventPtr event) { if (!triggerResend) return; std::string type = event->getType(); std::string uid = ""; std::string writerName = ""; if (type == "ipaaca::IUPayloadUpdate") { boost::shared_ptr<IUPayloadUpdate> update = boost::static_pointer_cast<IUPayloadUpdate>(event->getData()); uid = update->uid; writerName = update->writer_name; } else if (type == "ipaaca::IULinkUpdate") { boost::shared_ptr<IULinkUpdate> update = boost::static_pointer_cast<IULinkUpdate>(event->getData()); uid = update->uid; writerName = update->writer_name; } else if (type == "ipaaca::protobuf::IUCommission") { boost::shared_ptr<protobuf::IUCommission> update = boost::static_pointer_cast<protobuf::IUCommission>(event->getData()); uid = update->uid(); writerName = update->writer_name(); } else { IPAACA_ERROR("_trigger_resend_request: called for unhandled event type " << type) return; } if (!writerName.empty()) { RemoteServerPtr server = _get_remote_server(writerName); if (!uid.empty()) { boost::shared_ptr<protobuf::IUResendRequest> update = boost::shared_ptr<protobuf::IUResendRequest>(new protobuf::IUResendRequest()); update->set_uid(uid); update->set_hidden_scope_name(_uuid); boost::shared_ptr<int> result = server->call<int>("resendRequest", update, IPAACA_REMOTE_SERVER_TIMEOUT); if (*result == 0) { throw IUResendRequestFailedError(); } } } } IPAACA_EXPORT void InputBuffer::_handle_iu_events(EventPtr event) { std::string type = event->getType(); if (type == "ipaaca::RemotePushIU") { boost::shared_ptr<RemotePushIU> iu = boost::static_pointer_cast<RemotePushIU>(event->getData()); if (_iu_store.count(iu->category()) > 0) { // already got the IU... ignore } else { _iu_store[iu->uid()] = iu; iu->_set_buffer(this); call_iu_event_handlers(iu, false, IU_ADDED, iu->category() ); } } else if (type == "ipaaca::RemoteMessage") { boost::shared_ptr<RemoteMessage> iu = boost::static_pointer_cast<RemoteMessage>(event->getData()); call_iu_event_handlers(iu, false, IU_MESSAGE, iu->category() ); } else { RemotePushIUStore::iterator it; if (type == "ipaaca::IUPayloadUpdate") { boost::shared_ptr<IUPayloadUpdate> update = boost::static_pointer_cast<IUPayloadUpdate>(event->getData()); if (update->writer_name == _unique_name) { return; } it = _iu_store.find(update->uid); if (it == _iu_store.end()) { _trigger_resend_request(event); IPAACA_INFO("UPDATED message for an IU that we did not fully receive before") return; } it->second->_apply_update(update); call_iu_event_handlers(it->second, false, IU_UPDATED, it->second->category() ); } else if (type == "ipaaca::IULinkUpdate") { boost::shared_ptr<IULinkUpdate> update = boost::static_pointer_cast<IULinkUpdate>(event->getData()); if (update->writer_name == _unique_name) { return; } it = _iu_store.find(update->uid); if (it == _iu_store.end()) { _trigger_resend_request(event); IPAACA_INFO("LINKSUPDATED message for an IU that we did not fully receive before") return; } it->second->_apply_link_update(update); call_iu_event_handlers(it->second, false, IU_LINKSUPDATED, it->second->category() ); } else if (type == "ipaaca::protobuf::IUCommission") { boost::shared_ptr<protobuf::IUCommission> update = boost::static_pointer_cast<protobuf::IUCommission>(event->getData()); if (update->writer_name() == _unique_name) { return; } it = _iu_store.find(update->uid()); if (it == _iu_store.end()) { _trigger_resend_request(event); IPAACA_INFO("COMMITTED message for an IU that we did not fully receive before") return; } it->second->_apply_commission(); it->second->_revision = update->revision(); call_iu_event_handlers(it->second, false, IU_COMMITTED, it->second->category() ); } else if (type == "ipaaca::protobuf::IURetraction") { boost::shared_ptr<protobuf::IURetraction> update = boost::static_pointer_cast<protobuf::IURetraction>(event->getData()); it = _iu_store.find(update->uid()); if (it == _iu_store.end()) { IPAACA_INFO("Ignoring RETRACTED message for an IU that we did not fully receive before") return; } it->second->_revision = update->revision(); it->second->_apply_retraction(); auto final_iu_ref = it->second; ////// remove from InputBuffer? FIXME: unclear issue - resolve in ipaaca3 ////_iu_store.erase(it->first); // and call the handler. IU reference is still valid for this call, even if removed from buffer. call_iu_event_handlers(final_iu_ref, false, IU_RETRACTED, it->second->category() ); // } else { IPAACA_WARNING("(Unhandled Event type " << type << " !)"); return; } } } //}}} } // of namespace ipaaca