/* * This file is part of IPAACA, the * "Incremental Processing Architecture * for Artificial Conversational Agents". * * Copyright (c) 2009-2013 Sociable Agents Group * CITEC, Bielefeld University * * http://opensource.cit-ec.de/projects/ipaaca/ * http://purl.org/net/ipaaca * * This file may be licensed under the terms of of the * GNU Lesser General Public License Version 3 (the ``LGPL''), * or (at your option) any later version. * * Software distributed under the License is distributed * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either * express or implied. See the LGPL for the specific language * governing rights and limitations. * * You should have received a copy of the LGPL along with this * program. If not, go to http://www.gnu.org/licenses/lgpl.html * or write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * * The development of this software was supported by the * Excellence Cluster EXC 277 Cognitive Interaction Technology. * The Excellence Cluster EXC 277 is a grant of the Deutsche * Forschungsgemeinschaft (DFG) in the context of the German * Excellence Initiative. */ #include <ipaaca/ipaaca.h> #include <cstdlib> #ifdef WIN32 #include <rpc.h> #else #include <uuid/uuid.h> #include <glob.h> #endif namespace ipaaca { using namespace rsb; using namespace rsb::filter; using namespace rsb::converter; using namespace rsb::patterns; #define VERBOSE_HANDLERS 0 Lock& logger_lock() { static Lock lock; return lock; } // util and init//{{{ IPAACA_EXPORT bool Initializer::_initialized = false; //const LinkSet EMPTY_LINK_SET = LinkSet(); //const std::set<std::string> EMPTY_LINK_SET(); IPAACA_EXPORT bool Initializer::initialized() { return _initialized; } IPAACA_EXPORT void Initializer::initialize_ipaaca_rsb_if_needed() { if (_initialized) return; //IPAACA_INFO("Calling initialize_updated_default_config()") initialize_updated_default_config(); // RYT FIXME This configuration stuff has been simply removed in rsb! //ParticipantConfig config = ParticipantConfig::fromConfiguration(); //getFactory().setDefaultParticipantConfig(config); //IPAACA_INFO("Creating and registering Converters") boost::shared_ptr<IUConverter> iu_converter(new IUConverter()); converterRepository<std::string>()->registerConverter(iu_converter); boost::shared_ptr<MessageConverter> message_converter(new MessageConverter()); converterRepository<std::string>()->registerConverter(message_converter); boost::shared_ptr<IUPayloadUpdateConverter> payload_update_converter(new IUPayloadUpdateConverter()); converterRepository<std::string>()->registerConverter(payload_update_converter); boost::shared_ptr<IULinkUpdateConverter> link_update_converter(new IULinkUpdateConverter()); converterRepository<std::string>()->registerConverter(link_update_converter); boost::shared_ptr<ProtocolBufferConverter<protobuf::IUCommission> > iu_commission_converter(new ProtocolBufferConverter<protobuf::IUCommission> ()); converterRepository<std::string>()->registerConverter(iu_commission_converter); // dlw boost::shared_ptr<ProtocolBufferConverter<protobuf::IUResendRequest> > iu_resendrequest_converter(new ProtocolBufferConverter<protobuf::IUResendRequest> ()); converterRepository<std::string>()->registerConverter(iu_resendrequest_converter); boost::shared_ptr<ProtocolBufferConverter<protobuf::IURetraction> > iu_retraction_converter(new ProtocolBufferConverter<protobuf::IURetraction> ()); converterRepository<std::string>()->registerConverter(iu_retraction_converter); boost::shared_ptr<IntConverter> int_converter(new IntConverter()); converterRepository<std::string>()->registerConverter(int_converter); //IPAACA_INFO("Initialization complete.") _initialized = true; //IPAACA_TODO("initialize all converters") } IPAACA_EXPORT void Initializer::dump_current_default_config() { IPAACA_INFO("--- Dumping current default participant configuration ---") rsb::ParticipantConfig config = getFactory().getDefaultParticipantConfig(); std::set<rsb::ParticipantConfig::Transport> transports = config.getTransports(); for (std::set<rsb::ParticipantConfig::Transport>::const_iterator it=transports.begin(); it!=transports.end(); ++it) { IPAACA_INFO( "Active transport: " << it->getName() ) } IPAACA_INFO("--- End of configuration dump ---") //ParticipantConfig::Transport inprocess = config.getTransport("inprocess"); //inprocess.setEnabled(true); //config.addTransport(inprocess); } IPAACA_EXPORT void Initializer::initialize_updated_default_config() { // quick hack to iterate through the pwd parents // and find the closest rsb plugin dir // // but only if not yet defined const char* plugin_path = getenv("RSB_PLUGINS_CPP_PATH"); if (!plugin_path) { #ifdef WIN32 LOG_IPAACA_CONSOLE("WARNING: RSB_PLUGINS_CPP_PATH not set - in Windows it has to be specified.") //throw NotImplementedError(); #else LOG_IPAACA_CONSOLE("RSB_PLUGINS_CPP_PATH not set; looking here and up to 7 dirs up.") std::string pathstr = "./"; for (int i=0; i< 8 /* depth EIGHT (totally arbitrary..) */ ; i++) { std::string where_str = pathstr+"deps/lib/rsb*/plugins"; const char* where = where_str.c_str(); glob_t g; glob(where, 0, NULL, &g); if (g.gl_pathc>0) { const char* found_path = g.gl_pathv[0]; LOG_IPAACA_CONSOLE("Found an RSB plugin dir which will be used automatically: " << found_path) setenv("RSB_PLUGINS_CPP_PATH", found_path, 1); break; } // else keep going globfree(&g); pathstr += "../"; } #endif } else { LOG_IPAACA_CONSOLE("RSB_PLUGINS_CPP_PATH already defined: " << plugin_path) } } IPAACA_EXPORT std::string generate_uuid_string() { #ifdef WIN32 // Windows UUID uuid; RPC_STATUS stat; stat = UuidCreate(&uuid); if (stat == RPC_S_OK) { unsigned char* uuid_str = NULL; stat = UuidToString(&uuid, &uuid_str); if (stat == RPC_S_OK) { std::string result((const char*) uuid_str, 16); RpcStringFree(&uuid_str); return result; } } else { throw UUIDGenerationError(); } #else // POSIX uuid_t uuidt; uuid_generate(uuidt); #ifdef __MACOSX__ // (Mac) uuid_string_t uuidstr; uuid_unparse_lower(uuidt, uuidstr); return uuidstr; #else // (Linux) char result_c[37]; uuid_unparse_lower(uuidt, result_c); return result_c; #endif #endif } /* void init_inprocess_too() { //ParticipantConfig config = getFactory().getDefaultParticipantConfig(); ParticipantConfig config = ParticipantConfig::fromFile("rsb.cfg"); //ParticipantConfig::Transport inprocess = config.getTransport("inprocess"); //inprocess.setEnabled(true); //config.addTransport(inprocess); getFactory().setDefaultParticipantConfig(config); } */ //}}} IPAACA_EXPORT std::ostream& operator<<(std::ostream& os, const SmartLinkMap& obj)//{{{ { os << "{"; bool first = true; for (LinkMap::const_iterator it=obj._links.begin(); it!=obj._links.end(); ++it) { if (first) { first=false; } else { os << ", "; } os << "'" << it->first << "': ["; bool firstinner = true; for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) { if (firstinner) { firstinner=false; } else { os << ", "; } os << "'" << *it2 << "'"; } os << "]"; } os << "}"; return os; } //}}} IPAACA_EXPORT std::ostream& operator<<(std::ostream& os, const Payload& obj)//{{{ { os << "{"; bool first = true; for (std::map<std::string, std::string>::const_iterator it=obj._store.begin(); it!=obj._store.end(); ++it) { if (first) { first=false; } else { os << ", "; } os << "'" << it->first << "':'" << it->second << "'"; } os << "}"; return os; } //}}} IPAACA_EXPORT std::ostream& operator<<(std::ostream& os, const IUInterface& obj)//{{{ { os << "IUInterface(uid='" << obj.uid() << "'"; os << ", category='" << obj.category() << "'"; os << ", revision=" << obj.revision(); os << ", committed=" << (obj.committed()?"True":"False"); os << ", owner_name='" << obj.owner_name() << "'"; os << ", payload="; os << obj.const_payload(); os << ", links="; os << obj._links; os << ")"; return os; } //}}} IPAACA_EXPORT std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj)//{{{ { os << "PayloadUpdate(uid=" << obj.uid << ", revision=" << obj.revision; os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False"); os << ", new_items = {"; bool first = true; for (std::map<std::string, std::string>::const_iterator it=obj.new_items.begin(); it!=obj.new_items.end(); ++it) { if (first) { first=false; } else { os << ", "; } os << "'" << it->first << "':'" << it->second << "'"; } os << "}, keys_to_remove = ["; first = true; for (std::vector<std::string>::const_iterator it=obj.keys_to_remove.begin(); it!=obj.keys_to_remove.end(); ++it) { if (first) { first=false; } else { os << ", "; } os << "'" << *it << "'"; } os << "])"; return os; } //}}} IPAACA_EXPORT std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj)//{{{ { os << "LinkUpdate(uid=" << obj.uid << ", revision=" << obj.revision; os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False"); os << ", new_links = {"; bool first = true; for (std::map<std::string, std::set<std::string> >::const_iterator it=obj.new_links.begin(); it!=obj.new_links.end(); ++it) { if (first) { first=false; } else { os << ", "; } os << "'" << it->first << "': ["; bool ffirst = true; for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) { if (ffirst) { ffirst=false; } else { os << ", "; } os << "'" << *it2 << "'"; } os << "]"; } os << "}, links_to_remove = {"; first = true; for (std::map<std::string, std::set<std::string> >::const_iterator it=obj.links_to_remove.begin(); it!=obj.links_to_remove.end(); ++it) { if (first) { first=false; } else { os << ", "; } os << "'" << it->first << "': ["; bool ffirst = true; for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) { if (ffirst) { ffirst=false; } else { os << ", "; } os << "'" << *it2 << "'"; } os << "]"; } os << "})"; return os; } //}}} // SmartLinkMap//{{{ IPAACA_EXPORT LinkSet SmartLinkMap::empty_link_set; IPAACA_EXPORT void SmartLinkMap::_add_and_remove_links(const LinkMap& add, const LinkMap& remove) { // remove specified links for (LinkMap::const_iterator it = remove.begin(); it != remove.end(); ++it ) { // if link type exists if (_links.count(it->first) > 0) { // remove one by one for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) { _links[it->first].erase(*it2); } // wipe the type key if no more links are left if (_links[it->first].size() == 0) { _links.erase(it->first); } } } // add specified links for (LinkMap::const_iterator it = add.begin(); it != add.end(); ++it ) { for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) { _links[it->first].insert(*it2); } } } IPAACA_EXPORT void SmartLinkMap::_replace_links(const LinkMap& links) { //_links.clear(); _links=links; } IPAACA_EXPORT const LinkSet& SmartLinkMap::get_links(const std::string& key) { LinkMap::const_iterator it = _links.find(key); if (it==_links.end()) return empty_link_set; return it->second; } IPAACA_EXPORT const LinkMap& SmartLinkMap::get_all_links() { return _links; } //}}} // IUEventHandler//{{{ IPAACA_EXPORT IUEventHandler::IUEventHandler(IUEventHandlerFunction function, IUEventType event_mask, const std::string& category) : _function(function), _event_mask(event_mask), _for_all_categories(false) { if (category=="") { _for_all_categories = true; } else { _categories.insert(category); } } IPAACA_EXPORT IUEventHandler::IUEventHandler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories) : _function(function), _event_mask(event_mask), _for_all_categories(false) { if (categories.size()==0) { _for_all_categories = true; } else { _categories = categories; } } IPAACA_EXPORT void IUEventHandler::call(Buffer* buffer, boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category) { if (_condition_met(event_type, category)) { //IUInterface::ptr iu = buffer->get(uid); //if (iu) { #if VERBOSE_HANDLERS == 1 std::cout << "[" << pthread_self() << " handler ENTER]" << std::endl; #endif _function(iu, event_type, local); #if VERBOSE_HANDLERS == 1 std::cout << "[" << pthread_self() << " handler EXIT]" << std::endl; #endif //} } } //}}} // Buffer//{{{ IPAACA_EXPORT void Buffer::_allocate_unique_name(const std::string& basename, const std::string& function) { std::string uuid = ipaaca::generate_uuid_string(); _basename = basename; _uuid = uuid.substr(0,8); _unique_name = "/ipaaca/component/" + _basename + "ID" + _uuid + "/" + function; } IPAACA_EXPORT void Buffer::register_handler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories) { std::cout << "register_handler " << function << " " << event_mask << " " << categories << std::endl; IUEventHandler::ptr handler = IUEventHandler::ptr(new IUEventHandler(function, event_mask, categories)); _event_handlers.push_back(handler); } IPAACA_EXPORT void Buffer::register_handler(IUEventHandlerFunction function, IUEventType event_mask, const std::string& category) { std::cout << "register_handler " << function << " " << event_mask << " " << category << std::endl; IUEventHandler::ptr handler = IUEventHandler::ptr(new IUEventHandler(function, event_mask, category)); _event_handlers.push_back(handler); } IPAACA_EXPORT void Buffer::call_iu_event_handlers(boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category) { IPAACA_INFO("handling an event " << ipaaca::iu_event_type_to_str(event_type) << " for IU " << iu->uid()) std::cout << "handling an event " << ipaaca::iu_event_type_to_str(event_type) << " for IU " << iu->uid() << std::endl; for (std::vector<IUEventHandler::ptr>::iterator it = _event_handlers.begin(); it != _event_handlers.end(); ++it) { (*it)->call(this, iu, local, event_type, category); } } //}}} // Callbacks for OutputBuffer//{{{ IPAACA_EXPORT CallbackIUPayloadUpdate::CallbackIUPayloadUpdate(Buffer* buffer): _buffer(buffer) { } IPAACA_EXPORT CallbackIULinkUpdate::CallbackIULinkUpdate(Buffer* buffer): _buffer(buffer) { } IPAACA_EXPORT CallbackIUCommission::CallbackIUCommission(Buffer* buffer): _buffer(buffer) { } // dlw IPAACA_EXPORT CallbackIUResendRequest::CallbackIUResendRequest(Buffer* buffer): _buffer(buffer) { } IPAACA_EXPORT boost::shared_ptr<int> CallbackIUPayloadUpdate::call(const std::string& methodName, boost::shared_ptr<IUPayloadUpdate> update) { //std::cout << "-- Received a modify_payload with " << update->new_items.size() << " keys to merge." << std::endl; IUInterface::ptr iui = _buffer->get(update->uid); if (! iui) { IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid) return boost::shared_ptr<int>(new int(0)); } IU::ptr iu = boost::static_pointer_cast<IU>(iui); iu->_revision_lock.lock(); if ((update->revision != 0) && (update->revision != iu->_revision)) { IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid) iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } if (update->is_delta) { // FIXME FIXME this is an unsolved problem atm: deletes in a delta update are // sent individually. We should have something like _internal_merge_and_remove for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) { iu->payload()._internal_remove(*it, update->writer_name); //_buffer->unique_name()); } // but it is solved for pure merges: iu->payload()._internal_merge(update->new_items, update->writer_name); } else { iu->payload()._internal_replace_all(update->new_items, update->writer_name); //_buffer->unique_name()); } //std::cout << "-- Calling update handler due to remote write." << std::endl; _buffer->call_iu_event_handlers(iu, true, IU_UPDATED, iu->category()); revision_t revision = iu->revision(); iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(revision)); } IPAACA_EXPORT boost::shared_ptr<int> CallbackIULinkUpdate::call(const std::string& methodName, boost::shared_ptr<IULinkUpdate> update) { IUInterface::ptr iui = _buffer->get(update->uid); if (! iui) { IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid) return boost::shared_ptr<int>(new int(0)); } IU::ptr iu = boost::static_pointer_cast<IU>(iui); iu->_revision_lock.lock(); if ((update->revision != 0) && (update->revision != iu->_revision)) { IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid) iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } if (update->is_delta) { iu->modify_links(update->new_links, update->links_to_remove, update->writer_name); } else { iu->set_links(update->new_links, update->writer_name); } _buffer->call_iu_event_handlers(iu, true, IU_LINKSUPDATED, iu->category()); revision_t revision = iu->revision(); iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(revision)); } IPAACA_EXPORT boost::shared_ptr<int> CallbackIUCommission::call(const std::string& methodName, boost::shared_ptr<protobuf::IUCommission> update) { IUInterface::ptr iui = _buffer->get(update->uid()); if (! iui) { IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid()) return boost::shared_ptr<int>(new int(0)); } IU::ptr iu = boost::static_pointer_cast<IU>(iui); iu->_revision_lock.lock(); if ((update->revision() != 0) && (update->revision() != iu->_revision)) { IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid()) iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } if (iu->committed()) { return boost::shared_ptr<int>(new int(0)); } else { } iu->_internal_commit(update->writer_name()); _buffer->call_iu_event_handlers(iu, true, IU_LINKSUPDATED, iu->category()); revision_t revision = iu->revision(); iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(revision)); } /** dlw */ IPAACA_EXPORT boost::shared_ptr<int> CallbackIUResendRequest::call(const std::string& methodName, boost::shared_ptr<protobuf::IUResendRequest> update) { IUInterface::ptr iui = _buffer->get(update->uid()); if (! iui) { IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid()) return boost::shared_ptr<int>(new int(0)); } IU::ptr iu = boost::static_pointer_cast<IU>(iui); if ((update->has_hidden_scope_name() == true)&&(update->hidden_scope_name().compare("") != 0)){ //_buffer->call_iu_event_handlers(iu, true, IU_UPDATED, update->hidden_scope_name()); revision_t revision = iu->revision(); iu->_publish_resend(iu, update->hidden_scope_name()); return boost::shared_ptr<int>(new int(revision)); } else { revision_t revision = 0; return boost::shared_ptr<int>(new int(revision)); } } //}}} // OutputBuffer//{{{ IPAACA_EXPORT OutputBuffer::OutputBuffer(const std::string& basename, const std::string& channel) :Buffer(basename, "OB") { //IPAACA_INFO("Entering ...") _id_prefix = _basename + "-" + _uuid + "-IU-"; _channel = channel; _initialize_server(); //IPAACA_INFO("... exiting.") } IPAACA_EXPORT void OutputBuffer::_initialize_server() { //IPAACA_INFO("Entering ...") //IPAACA_INFO("Calling createServer(\"" << _unique_name << "\")") _server = getFactory().createServer( Scope( _unique_name ) ); //IPAACA_INFO("Registering methods") _server->registerMethod("updatePayload", Server::CallbackPtr(new CallbackIUPayloadUpdate(this))); _server->registerMethod("updateLinks", Server::CallbackPtr(new CallbackIULinkUpdate(this))); _server->registerMethod("commit", Server::CallbackPtr(new CallbackIUCommission(this))); // dlw _server->registerMethod("resendRequest", Server::CallbackPtr(new CallbackIUResendRequest(this))); //IPAACA_INFO("... exiting.") } IPAACA_EXPORT OutputBuffer::ptr OutputBuffer::create(const std::string& basename) { Initializer::initialize_ipaaca_rsb_if_needed(); return OutputBuffer::ptr(new OutputBuffer(basename)); } IPAACA_EXPORT IUInterface::ptr OutputBuffer::get(const std::string& iu_uid) { IUStore::iterator it = _iu_store.find(iu_uid); if (it==_iu_store.end()) return IUInterface::ptr(); return it->second; } IPAACA_EXPORT std::set<IUInterface::ptr> OutputBuffer::get_ius() { std::set<IUInterface::ptr> set; for (IUStore::iterator it=_iu_store.begin(); it!=_iu_store.end(); ++it) set.insert(it->second); return set; } IPAACA_EXPORT void OutputBuffer::_send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) { IULinkUpdate* lup = new ipaaca::IULinkUpdate(); Informer<ipaaca::IULinkUpdate>::DataPtr ldata(lup); lup->uid = iu->uid(); lup->is_delta = is_delta; lup->revision = revision; lup->is_delta = true; lup->new_links = new_links; if (is_delta) lup->links_to_remove = links_to_remove; if (writer_name=="") lup->writer_name = _unique_name; else lup->writer_name = writer_name; Informer<AnyType>::Ptr informer = _get_informer(iu->category()); informer->publish(ldata); } IPAACA_EXPORT void OutputBuffer::_send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) { IUPayloadUpdate* pup = new ipaaca::IUPayloadUpdate(); Informer<ipaaca::IUPayloadUpdate>::DataPtr pdata(pup); pup->uid = iu->uid(); pup->is_delta = is_delta; pup->revision = revision; pup->new_items = new_items; if (is_delta) pup->keys_to_remove = keys_to_remove; if (writer_name=="") pup->writer_name = _unique_name; else pup->writer_name = writer_name; Informer<AnyType>::Ptr informer = _get_informer(iu->category()); informer->publish(pdata); } IPAACA_EXPORT void OutputBuffer::_send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name) { Informer<protobuf::IUCommission>::DataPtr data(new protobuf::IUCommission()); data->set_uid(iu->uid()); data->set_revision(revision); if (writer_name=="") data->set_writer_name(_unique_name); else data->set_writer_name(writer_name); Informer<AnyType>::Ptr informer = _get_informer(iu->category()); informer->publish(data); } IPAACA_EXPORT void OutputBuffer::add(IU::ptr iu) { if (_iu_store.count(iu->uid()) > 0) { throw IUPublishedError(); } if (iu->is_published()) { throw IUPublishedError(); } if (iu->access_mode() != IU_ACCESS_MESSAGE) { // (for Message-type IUs: do not actually store them) _iu_store[iu->uid()] = iu; } iu->_associate_with_buffer(this); //shared_from_this()); _publish_iu(iu); } IPAACA_EXPORT void OutputBuffer::_publish_iu(IU::ptr iu) { Informer<AnyType>::Ptr informer = _get_informer(iu->_category); Informer<ipaaca::IU>::DataPtr iu_data(iu); informer->publish(iu_data); } IPAACA_EXPORT void OutputBuffer::_publish_iu_resend(IU::ptr iu, const std::string& hidden_scope_name) { Informer<AnyType>::Ptr informer = _get_informer(hidden_scope_name); Informer<ipaaca::IU>::DataPtr iu_data(iu); informer->publish(iu_data); } IPAACA_EXPORT Informer<AnyType>::Ptr OutputBuffer::_get_informer(const std::string& category) { if (_informer_store.count(category) > 0) { return _informer_store[category]; } else { //IPAACA_INFO("Making new informer for category " << category) std::string scope_string = "/ipaaca/channel/" + _channel + "/category/" + category; Informer<AnyType>::Ptr informer = getFactory().createInformer<AnyType> ( Scope(scope_string)); _informer_store[category] = informer; return informer; } } IPAACA_EXPORT boost::shared_ptr<IU> OutputBuffer::remove(const std::string& iu_uid) { IUStore::iterator it = _iu_store.find(iu_uid); if (it == _iu_store.end()) { IPAACA_WARNING("Removal of IU " << iu_uid << " requested, but not present in our OutputBuffer") //throw IUNotFoundError(); } IU::ptr iu = it->second; _retract_iu(iu); _iu_store.erase(iu_uid); return iu; } IPAACA_EXPORT boost::shared_ptr<IU> OutputBuffer::remove(IU::ptr iu) { return remove(iu->uid()); // to make sure it is in the store } IPAACA_EXPORT void OutputBuffer::_retract_iu(IU::ptr iu) { Informer<protobuf::IURetraction>::DataPtr data(new protobuf::IURetraction()); data->set_uid(iu->uid()); data->set_revision(iu->revision()); Informer<AnyType>::Ptr informer = _get_informer(iu->category()); informer->publish(data); } //}}} // InputBuffer//{{{ IPAACA_EXPORT InputBuffer::InputBuffer(const BufferConfiguration& bufferconfiguration) :Buffer(bufferconfiguration.get_basename(), "IB") { _channel = bufferconfiguration.get_channel(); for (std::vector<std::string>::const_iterator it=bufferconfiguration.get_category_interests().begin(); it!=bufferconfiguration.get_category_interests().end(); ++it) { _create_category_listener_if_needed(*it); } _create_category_listener_if_needed(_uuid); triggerResend = false; } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::set<std::string>& category_interests) :Buffer(basename, "IB") { _channel = "default"; for (std::set<std::string>::const_iterator it=category_interests.begin(); it!=category_interests.end(); ++it) { _create_category_listener_if_needed(*it); } _create_category_listener_if_needed(_uuid); triggerResend = false; } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests) :Buffer(basename, "IB") { _channel = "default"; for (std::vector<std::string>::const_iterator it=category_interests.begin(); it!=category_interests.end(); ++it) { _create_category_listener_if_needed(*it); } _create_category_listener_if_needed(_uuid); triggerResend = false; } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1) :Buffer(basename, "IB") { _channel = "default"; _create_category_listener_if_needed(category_interest1); } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2) :Buffer(basename, "IB") { _channel = "default"; _create_category_listener_if_needed(category_interest1); _create_category_listener_if_needed(category_interest2); } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3) :Buffer(basename, "IB") { _channel = "default"; _create_category_listener_if_needed(category_interest1); _create_category_listener_if_needed(category_interest2); _create_category_listener_if_needed(category_interest3); } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4) :Buffer(basename, "IB") { _channel = "default"; _create_category_listener_if_needed(category_interest1); _create_category_listener_if_needed(category_interest2); _create_category_listener_if_needed(category_interest3); _create_category_listener_if_needed(category_interest4); } IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const BufferConfiguration& bufferconfiguration) { Initializer::initialize_ipaaca_rsb_if_needed(); return InputBuffer::ptr(new InputBuffer(bufferconfiguration)); } IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::set<std::string>& category_interests) { Initializer::initialize_ipaaca_rsb_if_needed(); return InputBuffer::ptr(new InputBuffer(basename, category_interests)); } IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::vector<std::string>& category_interests) { Initializer::initialize_ipaaca_rsb_if_needed(); return InputBuffer::ptr(new InputBuffer(basename, category_interests)); } IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1) { Initializer::initialize_ipaaca_rsb_if_needed(); return InputBuffer::ptr(new InputBuffer(basename, category_interest1)); } IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2) { Initializer::initialize_ipaaca_rsb_if_needed(); return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2)); } IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3) { Initializer::initialize_ipaaca_rsb_if_needed(); return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3)); } IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4) { Initializer::initialize_ipaaca_rsb_if_needed(); return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3, category_interest4)); } IPAACA_EXPORT void InputBuffer::set_resend(bool resendActive) { triggerResend = resendActive; } IPAACA_EXPORT bool InputBuffer::get_resend() { return triggerResend; } IPAACA_EXPORT IUInterface::ptr InputBuffer::get(const std::string& iu_uid) { RemotePushIUStore::iterator it = _iu_store.find(iu_uid); // TODO genericize if (it==_iu_store.end()) return IUInterface::ptr(); return it->second; } IPAACA_EXPORT std::set<IUInterface::ptr> InputBuffer::get_ius() { std::set<IUInterface::ptr> set; for (RemotePushIUStore::iterator it=_iu_store.begin(); it!=_iu_store.end(); ++it) set.insert(it->second); // TODO genericize return set; } IPAACA_EXPORT RemoteServerPtr InputBuffer::_get_remote_server(const std::string& unique_server_name) { std::map<std::string, RemoteServerPtr>::iterator it = _remote_server_store.find(unique_server_name); if (it!=_remote_server_store.end()) return it->second; RemoteServerPtr remote_server = getFactory().createRemoteServer(Scope(unique_server_name)); _remote_server_store[unique_server_name] = remote_server; return remote_server; } IPAACA_EXPORT ListenerPtr InputBuffer::_create_category_listener_if_needed(const std::string& category) { IPAACA_INFO("Entering ...") std::map<std::string, ListenerPtr>::iterator it = _listener_store.find(category); if (it!=_listener_store.end()) { IPAACA_INFO("... exiting.") return it->second; } //IPAACA_INFO("Creating a new listener for category " << category) std::string scope_string = "/ipaaca/channel/" + _channel + "/category/" + category; ListenerPtr listener = getFactory().createListener( Scope(scope_string) ); IPAACA_INFO("Adding handler") HandlerPtr event_handler = HandlerPtr( new EventFunctionHandler( boost::bind(&InputBuffer::_handle_iu_events, this, _1) ) ); listener->addHandler(event_handler); _listener_store[category] = listener; IPAACA_INFO("... exiting.") return listener; } IPAACA_EXPORT void InputBuffer::_trigger_resend_request(EventPtr event) { if (!triggerResend) return; std::string type = event->getType(); std::string uid = ""; std::string writerName = ""; if (type == "ipaaca::IUPayloadUpdate") { boost::shared_ptr<IUPayloadUpdate> update = boost::static_pointer_cast<IUPayloadUpdate>(event->getData()); uid = update->uid; writerName = update->writer_name; } else if (type == "ipaaca::IULinkUpdate") { boost::shared_ptr<IULinkUpdate> update = boost::static_pointer_cast<IULinkUpdate>(event->getData()); uid = update->uid; writerName = update->writer_name; } else if (type == "ipaaca::protobuf::IUCommission") { boost::shared_ptr<protobuf::IUCommission> update = boost::static_pointer_cast<protobuf::IUCommission>(event->getData()); uid = update->uid(); writerName = update->writer_name(); } else { std::cout << "trigger ??? else" << std::endl; } if (!writerName.empty()) { RemoteServerPtr server = _get_remote_server(writerName); if (!uid.empty()) { boost::shared_ptr<protobuf::IUResendRequest> update = boost::shared_ptr<protobuf::IUResendRequest>(new protobuf::IUResendRequest()); update->set_uid(uid); update->set_hidden_scope_name(_uuid); boost::shared_ptr<int> result = server->call<int>("resendRequest", update, IPAACA_REMOTE_SERVER_TIMEOUT); if (*result == 0) { throw IUResendRequestFailedError(); } else { std::cout << "revision " << *result << std::endl; } } } } IPAACA_EXPORT void InputBuffer::_handle_iu_events(EventPtr event) { std::cout << "handle iu events" << std::endl; std::string type = event->getType(); if (type == "ipaaca::RemotePushIU") { boost::shared_ptr<RemotePushIU> iu = boost::static_pointer_cast<RemotePushIU>(event->getData()); if (_iu_store.count(iu->category()) > 0) { // already got the IU... ignore } else { _iu_store[iu->uid()] = iu; iu->_set_buffer(this); call_iu_event_handlers(iu, false, IU_ADDED, iu->category() ); } //IPAACA_INFO( "New RemotePushIU state: " << (*iu) ) } else if (type == "ipaaca::RemoteMessage") { boost::shared_ptr<RemoteMessage> iu = boost::static_pointer_cast<RemoteMessage>(event->getData()); //_iu_store[iu->uid()] = iu; //iu->_set_buffer(this); //std::cout << "REFCNT after cast, before calling handlers: " << iu.use_count() << std::endl; call_iu_event_handlers(iu, false, IU_MESSAGE, iu->category() ); //_iu_store.erase(iu->uid()); } else { RemotePushIUStore::iterator it; if (type == "ipaaca::IUPayloadUpdate") { boost::shared_ptr<IUPayloadUpdate> update = boost::static_pointer_cast<IUPayloadUpdate>(event->getData()); //IPAACA_INFO("** writer name: " << update->writer_name) std::cout << "writer name " << update->writer_name << std::endl; if (update->writer_name == _unique_name) { return; } it = _iu_store.find(update->uid); if (it == _iu_store.end()) { _trigger_resend_request(event); IPAACA_INFO("Using UPDATED message for an IU that we did not fully receive before") return; } it->second->_apply_update(update); call_iu_event_handlers(it->second, false, IU_UPDATED, it->second->category() ); } else if (type == "ipaaca::IULinkUpdate") { boost::shared_ptr<IULinkUpdate> update = boost::static_pointer_cast<IULinkUpdate>(event->getData()); if (update->writer_name == _unique_name) { return; } it = _iu_store.find(update->uid); if (it == _iu_store.end()) { _trigger_resend_request(event); IPAACA_INFO("Ignoring LINKSUPDATED message for an IU that we did not fully receive before") return; } it->second->_apply_link_update(update); call_iu_event_handlers(it->second, false, IU_LINKSUPDATED, it->second->category() ); } else if (type == "ipaaca::protobuf::IUCommission") { boost::shared_ptr<protobuf::IUCommission> update = boost::static_pointer_cast<protobuf::IUCommission>(event->getData()); if (update->writer_name() == _unique_name) { return; } it = _iu_store.find(update->uid()); if (it == _iu_store.end()) { _trigger_resend_request(event); IPAACA_INFO("Ignoring COMMITTED message for an IU that we did not fully receive before") return; } // it->second->_apply_commission(); it->second->_revision = update->revision(); call_iu_event_handlers(it->second, false, IU_COMMITTED, it->second->category() ); // // } else if (type == "ipaaca::protobuf::IURetraction") { boost::shared_ptr<protobuf::IURetraction> update = boost::static_pointer_cast<protobuf::IURetraction>(event->getData()); it = _iu_store.find(update->uid()); if (it == _iu_store.end()) { _trigger_resend_request(event); IPAACA_INFO("Ignoring RETRACTED message for an IU that we did not fully receive before") return; } // it->second->_revision = update->revision(); it->second->_apply_retraction(); // remove from InputBuffer FIXME: this is a crossover between retracted and deleted behavior _iu_store.erase(it->first); // and call the handler. IU reference is still valid for this call, although removed from buffer. call_iu_event_handlers(it->second, false, IU_COMMITTED, it->second->category() ); // } else { std::cout << "(Unhandled Event type " << type << " !)" << std::endl; return; } //IPAACA_INFO( "New RemotePushIU state: " << *(it->second) ) } } //}}} // IUInterface//{{{ IPAACA_EXPORT IUInterface::IUInterface() : _buffer(NULL), _committed(false), _retracted(false) { } IPAACA_EXPORT void IUInterface::_set_uid(const std::string& uid) { if (_uid != "") { throw IUAlreadyHasAnUIDError(); } _uid = uid; } IPAACA_EXPORT void IUInterface::_set_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) { if (_buffer) { throw IUAlreadyInABufferError(); } _buffer = buffer; } IPAACA_EXPORT void IUInterface::_set_owner_name(const std::string& owner_name) { if (_owner_name != "") { throw IUAlreadyHasAnOwnerNameError(); } _owner_name = owner_name; } /// set the buffer pointer and the owner names of IU and Payload IPAACA_EXPORT void IUInterface::_associate_with_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) { _set_buffer(buffer); // will throw if already set _set_owner_name(buffer->unique_name()); payload()._set_owner_name(buffer->unique_name()); } /// C++-specific convenience function to add one single link IPAACA_EXPORT void IUInterface::add_link(const std::string& type, const std::string& target, const std::string& writer_name) { LinkMap none; LinkMap add; add[type].insert(target); _modify_links(true, add, none, writer_name); _add_and_remove_links(add, none); } /// C++-specific convenience function to remove one single link IPAACA_EXPORT void IUInterface::remove_link(const std::string& type, const std::string& target, const std::string& writer_name) { LinkMap none; LinkMap remove; remove[type].insert(target); _modify_links(true, none, remove, writer_name); _add_and_remove_links(none, remove); } IPAACA_EXPORT void IUInterface::add_links(const std::string& type, const LinkSet& targets, const std::string& writer_name) { LinkMap none; LinkMap add; add[type] = targets; _modify_links(true, add, none, writer_name); _add_and_remove_links(add, none); } IPAACA_EXPORT void IUInterface::remove_links(const std::string& type, const LinkSet& targets, const std::string& writer_name) { LinkMap none; LinkMap remove; remove[type] = targets; _modify_links(true, none, remove, writer_name); _add_and_remove_links(none, remove); } IPAACA_EXPORT void IUInterface::modify_links(const LinkMap& add, const LinkMap& remove, const std::string& writer_name) { _modify_links(true, add, remove, writer_name); _add_and_remove_links(add, remove); } IPAACA_EXPORT void IUInterface::set_links(const LinkMap& links, const std::string& writer_name) { LinkMap none; _modify_links(false, links, none, writer_name); _replace_links(links); } IPAACA_HEADER_EXPORT const std::string& IUInterface::channel() { if (_buffer == NULL) throw IUUnpublishedError(); else return _buffer->channel(); } //}}} // IU//{{{ IPAACA_EXPORT IU::ptr IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type) { IU::ptr iu = IU::ptr(new IU(category, access_mode, read_only, payload_type)); /* params */ //)); iu->_payload.initialize(iu); return iu; } IPAACA_EXPORT IU::IU(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type) { _revision = 1; _uid = ipaaca::generate_uuid_string(); _category = category; _payload_type = payload_type; // payload initialization deferred to IU::create(), above _read_only = read_only; _access_mode = access_mode; _committed = false; } IPAACA_EXPORT void IU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) { _revision_lock.lock(); if (_committed) { _revision_lock.unlock(); throw IUCommittedError(); } _increase_revision_number(); if (is_published()) { _buffer->_send_iu_link_update(this, is_delta, _revision, new_links, links_to_remove, writer_name); } _revision_lock.unlock(); } IPAACA_EXPORT void IU::_publish_resend(IU::ptr iu, const std::string& hidden_scope_name) { //_revision_lock.lock(); //if (_committed) { // _revision_lock.unlock(); // throw IUCommittedError(); //} //_increase_revision_number(); //if (is_published()) { //IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name _buffer->_publish_iu_resend(iu, hidden_scope_name); //} //_revision_lock.unlock(); } IPAACA_EXPORT void IU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) { _revision_lock.lock(); if (_committed) { _revision_lock.unlock(); throw IUCommittedError(); } _increase_revision_number(); if (is_published()) { //std::cout << "Sending a payload update with " << new_items.size() << " entries to merge." << std::endl; _buffer->_send_iu_payload_update(this, is_delta, _revision, new_items, keys_to_remove, writer_name); } _revision_lock.unlock(); } IPAACA_EXPORT void IU::commit() { _internal_commit(); } IPAACA_EXPORT void IU::_internal_commit(const std::string& writer_name) { _revision_lock.lock(); if (_committed) { _revision_lock.unlock(); throw IUCommittedError(); } _increase_revision_number(); _committed = true; if (is_published()) { _buffer->_send_iu_commission(this, _revision, writer_name); } _revision_lock.unlock(); } //}}} // Message//{{{ Message::ptr Message::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type) { Message::ptr iu = Message::ptr(new Message(category, access_mode, read_only, payload_type)); /* params */ //)); iu->_payload.initialize(iu); return iu; } Message::Message(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type) : IU(category, access_mode, read_only, payload_type) { } void Message::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) { if (is_published()) { IPAACA_INFO("Info: modifying a Message after sending has no global effects") } } void Message::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) { if (is_published()) { IPAACA_INFO("Info: modifying a Message after sending has no global effects") } } void Message::_internal_commit(const std::string& writer_name) { if (is_published()) { IPAACA_INFO("Info: committing to a Message after sending has no global effects") } } //}}} // RemotePushIU//{{{ IPAACA_EXPORT RemotePushIU::ptr RemotePushIU::create() { RemotePushIU::ptr iu = RemotePushIU::ptr(new RemotePushIU(/* params */)); iu->_payload.initialize(iu); return iu; } IPAACA_EXPORT RemotePushIU::RemotePushIU() { // nothing } IPAACA_EXPORT void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) { if (_committed) { throw IUCommittedError(); } if (_read_only) { throw IUReadOnlyError(); } RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name); IULinkUpdate::ptr update = IULinkUpdate::ptr(new IULinkUpdate()); update->uid = _uid; update->revision = _revision; update->is_delta = is_delta; update->writer_name = _buffer->unique_name(); update->new_links = new_links; update->links_to_remove = links_to_remove; boost::shared_ptr<int> result = server->call<int>("updateLinks", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO if (*result == 0) { throw IUUpdateFailedError(); } else { _revision = *result; } } IPAACA_EXPORT void RemotePushIU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) { //std::cout << "-- Sending a modify_payload with " << new_items.size() << " keys to merge." << std::endl; if (_committed) { throw IUCommittedError(); } if (_read_only) { throw IUReadOnlyError(); } RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name); IUPayloadUpdate::ptr update = IUPayloadUpdate::ptr(new IUPayloadUpdate()); update->uid = _uid; update->revision = _revision; update->is_delta = is_delta; update->writer_name = _buffer->unique_name(); update->new_items = new_items; update->keys_to_remove = keys_to_remove; boost::shared_ptr<int> result = server->call<int>("updatePayload", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO if (*result == 0) { throw IUUpdateFailedError(); } else { _revision = *result; } } IPAACA_EXPORT void RemotePushIU::commit() { if (_read_only) { throw IUReadOnlyError(); } if (_committed) { // Following python version: ignoring multiple commit return; } RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name); boost::shared_ptr<protobuf::IUCommission> update = boost::shared_ptr<protobuf::IUCommission>(new protobuf::IUCommission()); update->set_uid(_uid); update->set_revision(_revision); update->set_writer_name(_buffer->unique_name()); boost::shared_ptr<int> result = server->call<int>("commit", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO if (*result == 0) { throw IUUpdateFailedError(); } else { _revision = *result; } } IPAACA_EXPORT void RemotePushIU::_apply_link_update(IULinkUpdate::ptr update) { _revision = update->revision; if (update->is_delta) { _add_and_remove_links(update->new_links, update->links_to_remove); } else { _replace_links(update->new_links); } } IPAACA_EXPORT void RemotePushIU::_apply_update(IUPayloadUpdate::ptr update) { _revision = update->revision; if (update->is_delta) { for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) { _payload._remotely_enforced_delitem(*it); } for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) { _payload._remotely_enforced_setitem(it->first, it->second); } } else { _payload._remotely_enforced_wipe(); for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) { _payload._remotely_enforced_setitem(it->first, it->second); } } } IPAACA_EXPORT void RemotePushIU::_apply_commission() { _committed = true; } IPAACA_EXPORT void RemotePushIU::_apply_retraction() { _retracted = true; } //}}} // RemoteMessage//{{{ IPAACA_EXPORT RemoteMessage::ptr RemoteMessage::create() { RemoteMessage::ptr iu = RemoteMessage::ptr(new RemoteMessage(/* params */)); iu->_payload.initialize(iu); return iu; } IPAACA_EXPORT RemoteMessage::RemoteMessage() { // nothing } IPAACA_EXPORT void RemoteMessage::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) { IPAACA_INFO("Info: modifying a RemoteMessage only has local effects") } IPAACA_EXPORT void RemoteMessage::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) { IPAACA_INFO("Info: modifying a RemoteMessage only has local effects") } IPAACA_EXPORT void RemoteMessage::commit() { IPAACA_INFO("Info: committing to a RemoteMessage only has local effects") } IPAACA_EXPORT void RemoteMessage::_apply_link_update(IULinkUpdate::ptr update) { IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_link_update") _revision = update->revision; if (update->is_delta) { _add_and_remove_links(update->new_links, update->links_to_remove); } else { _replace_links(update->new_links); } } IPAACA_EXPORT void RemoteMessage::_apply_update(IUPayloadUpdate::ptr update) { IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_update") _revision = update->revision; if (update->is_delta) { for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) { _payload._remotely_enforced_delitem(*it); } for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) { _payload._remotely_enforced_setitem(it->first, it->second); } } else { _payload._remotely_enforced_wipe(); for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) { _payload._remotely_enforced_setitem(it->first, it->second); } } } IPAACA_EXPORT void RemoteMessage::_apply_commission() { IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_commission") _committed = true; } IPAACA_EXPORT void RemoteMessage::_apply_retraction() { IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_retraction") _retracted = true; } //}}} // PayloadEntryProxy//{{{ IPAACA_EXPORT PayloadEntryProxy::PayloadEntryProxy(Payload* payload, const std::string& key) : _payload(payload), _key(key) { } IPAACA_EXPORT PayloadEntryProxy& PayloadEntryProxy::operator=(const std::string& value) { //std::cout << "operator=(string)" << std::endl; _payload->set(_key, value); return *this; } IPAACA_EXPORT PayloadEntryProxy& PayloadEntryProxy::operator=(const char* value) { //std::cout << "operator=(const char*)" << std::endl; _payload->set(_key, value); return *this; } IPAACA_EXPORT PayloadEntryProxy& PayloadEntryProxy::operator=(double value) { //std::cout << "operator=(double)" << std::endl; _payload->set(_key, boost::lexical_cast<std::string>(value)); return *this; } IPAACA_EXPORT PayloadEntryProxy& PayloadEntryProxy::operator=(bool value) { //std::cout << "operator=(bool)" << std::endl; _payload->set(_key, boost::lexical_cast<std::string>(value)); return *this; } IPAACA_EXPORT inline PayloadEntryProxy::operator std::string() { return PayloadEntryProxy::get<std::string>(); } IPAACA_EXPORT inline PayloadEntryProxy::operator long() { return PayloadEntryProxy::get<long>(); } IPAACA_EXPORT inline PayloadEntryProxy::operator double() { return PayloadEntryProxy::get<double>(); } IPAACA_EXPORT inline PayloadEntryProxy::operator bool() { return PayloadEntryProxy::get<bool>(); } IPAACA_EXPORT inline std::string PayloadEntryProxy::to_str() { return PayloadEntryProxy::get<std::string>(); } IPAACA_EXPORT inline long PayloadEntryProxy::to_long() { return PayloadEntryProxy::get<long>(); } IPAACA_EXPORT inline double PayloadEntryProxy::to_float() { return PayloadEntryProxy::get<double>(); } IPAACA_EXPORT inline bool PayloadEntryProxy::to_bool() { return PayloadEntryProxy::get<bool>(); } // // new stuff for protocol v2 // IPAACA_HEADER_EXPORT template<> std::string PayloadEntryProxy::get<std::string>() { return _payload->get(_key); } IPAACA_HEADER_EXPORT template<> long PayloadEntryProxy::get<long>() { return atof(operator std::string().c_str()); } IPAACA_HEADER_EXPORT template<> double PayloadEntryProxy::get<double>() { return atol(operator std::string().c_str()); } IPAACA_HEADER_EXPORT template<> bool PayloadEntryProxy::get<bool>() { std::string s = operator std::string(); return ((s=="1")||(s=="true")||(s=="True")); } // complex types IPAACA_HEADER_EXPORT template<> std::list<std::string> PayloadEntryProxy::get<std::list<std::string> >() { std::list<std::string> l; l.push_back(PayloadEntryProxy::get<std::string>()); return l; } IPAACA_HEADER_EXPORT template<> std::vector<std::string> PayloadEntryProxy::get<std::vector<std::string> >() { std::vector<std::string> v; v.push_back(PayloadEntryProxy::get<std::string>()); return v; } IPAACA_HEADER_EXPORT template<> std::map<std::string, std::string> PayloadEntryProxy::get<std::map<std::string, std::string> >() { std::map<std::string, std::string> m; m["__automatic__"] = PayloadEntryProxy::get<std::string>(); return m; } //}}} // Payload//{{{ IPAACA_EXPORT void Payload::initialize(boost::shared_ptr<IUInterface> iu) { _iu = boost::weak_ptr<IUInterface>(iu); } IPAACA_EXPORT PayloadEntryProxy Payload::operator[](const std::string& key) { //boost::shared_ptr<PayloadEntryProxy> p(new PayloadEntryProxy(this, key)); return PayloadEntryProxy(this, key); } IPAACA_EXPORT Payload::operator std::map<std::string, std::string>() { return _store; } IPAACA_EXPORT inline void Payload::_internal_set(const std::string& k, const std::string& v, const std::string& writer_name) { std::map<std::string, std::string> _new; std::vector<std::string> _remove; _new[k]=v; _iu.lock()->_modify_payload(true, _new, _remove, writer_name ); _store[k] = v; } IPAACA_EXPORT inline void Payload::_internal_remove(const std::string& k, const std::string& writer_name) { std::map<std::string, std::string> _new; std::vector<std::string> _remove; _remove.push_back(k); _iu.lock()->_modify_payload(true, _new, _remove, writer_name ); _store.erase(k); } IPAACA_EXPORT void Payload::_internal_replace_all(const std::map<std::string, std::string>& new_contents, const std::string& writer_name) { std::vector<std::string> _remove; _iu.lock()->_modify_payload(false, new_contents, _remove, writer_name ); _store = new_contents; } IPAACA_EXPORT void Payload::_internal_merge(const std::map<std::string, std::string>& contents_to_merge, const std::string& writer_name) { std::vector<std::string> _remove; _iu.lock()->_modify_payload(true, contents_to_merge, _remove, writer_name ); _store.insert(contents_to_merge.begin(), contents_to_merge.end()); //for (std::map<std::string, std::string>::iterator it = contents_to_merge.begin(); it!=contents_to_merge.end(); i++) { // _store[it->first] = it->second; //} } IPAACA_EXPORT inline std::string Payload::get(const std::string& k) { if (_store.count(k)>0) return _store[k]; else return IPAACA_PAYLOAD_DEFAULT_STRING_VALUE; } IPAACA_EXPORT void Payload::_remotely_enforced_wipe() { _store.clear(); } IPAACA_EXPORT void Payload::_remotely_enforced_delitem(const std::string& k) { _store.erase(k); } IPAACA_EXPORT void Payload::_remotely_enforced_setitem(const std::string& k, const std::string& v) { _store[k] = v; } //}}} // IUConverter//{{{ IPAACA_EXPORT IUConverter::IUConverter() : Converter<std::string> (IPAACA_SYSTEM_DEPENDENT_CLASS_NAME("ipaaca::IU"), "ipaaca-iu", true) { } IPAACA_EXPORT std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire) { //std::cout << "serialize" << std::endl; // Ensure that DATA actually holds a datum of the data-type we expect. assert(data.first == getDataType()); // "ipaaca::IU" // NOTE: a dynamic_pointer_cast cannot be used from void* boost::shared_ptr<const IU> obj = boost::static_pointer_cast<const IU> (data.second); boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU()); // transfer obj data to pbo pbo->set_uid(obj->uid()); pbo->set_revision(obj->revision()); pbo->set_category(obj->category()); pbo->set_payload_type(obj->payload_type()); pbo->set_owner_name(obj->owner_name()); pbo->set_committed(obj->committed()); ipaaca::protobuf::IU_AccessMode a_m; switch(obj->access_mode()) { case IU_ACCESS_PUSH: a_m = ipaaca::protobuf::IU_AccessMode_PUSH; break; case IU_ACCESS_REMOTE: a_m = ipaaca::protobuf::IU_AccessMode_REMOTE; break; case IU_ACCESS_MESSAGE: a_m = ipaaca::protobuf::IU_AccessMode_MESSAGE; break; } pbo->set_access_mode(a_m); pbo->set_read_only(obj->read_only()); for (std::map<std::string, std::string>::const_iterator it=obj->_payload._store.begin(); it!=obj->_payload._store.end(); ++it) { protobuf::PayloadItem* item = pbo->add_payload(); item->set_key(it->first); item->set_value(it->second); item->set_type("str"); // FIXME other types than str (later) } for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) { protobuf::LinkSet* links = pbo->add_links(); links->set_type(it->first); for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) { links->add_targets(*it2); } } pbo->SerializeToString(&wire); switch(obj->access_mode()) { case IU_ACCESS_PUSH: //std::cout << "Requesting to send as ipaaca-iu" << std::endl; return "ipaaca-iu"; case IU_ACCESS_MESSAGE: //std::cout << "Requesting to send as ipaaca-messageiu" << std::endl; return "ipaaca-messageiu"; default: //std::cout << "Requesting to send as default" << std::endl; return getWireSchema(); } } IPAACA_EXPORT AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std::string& wire) { //std::cout << "deserialize" << std::endl; assert(wireSchema == getWireSchema()); // "ipaaca-iu" boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU()); pbo->ParseFromString(wire); IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode()); switch(mode) { case IU_ACCESS_PUSH: { // Create a "remote push IU" boost::shared_ptr<RemotePushIU> obj = RemotePushIU::create(); // transfer pbo data to obj obj->_uid = pbo->uid(); obj->_revision = pbo->revision(); obj->_category = pbo->category(); obj->_payload_type = pbo->payload_type(); obj->_owner_name = pbo->owner_name(); obj->_committed = pbo->committed(); obj->_read_only = pbo->read_only(); obj->_access_mode = IU_ACCESS_PUSH; for (int i=0; i<pbo->payload_size(); i++) { const protobuf::PayloadItem& it = pbo->payload(i); obj->_payload._store[it.key()] = it.value(); } for (int i=0; i<pbo->links_size(); i++) { const protobuf::LinkSet& pls = pbo->links(i); LinkSet& ls = obj->_links._links[pls.type()]; for (int j=0; j<pls.targets_size(); j++) { ls.insert(pls.targets(j)); } } //return std::make_pair(getDataType(), obj); return std::make_pair("ipaaca::RemotePushIU", obj); break; } case IU_ACCESS_MESSAGE: { // Create a "Message-type IU" boost::shared_ptr<RemoteMessage> obj = RemoteMessage::create(); //std::cout << "REFCNT after create: " << obj.use_count() << std::endl; // transfer pbo data to obj obj->_uid = pbo->uid(); obj->_revision = pbo->revision(); obj->_category = pbo->category(); obj->_payload_type = pbo->payload_type(); obj->_owner_name = pbo->owner_name(); obj->_committed = pbo->committed(); obj->_read_only = pbo->read_only(); obj->_access_mode = IU_ACCESS_MESSAGE; for (int i=0; i<pbo->payload_size(); i++) { const protobuf::PayloadItem& it = pbo->payload(i); obj->_payload._store[it.key()] = it.value(); } for (int i=0; i<pbo->links_size(); i++) { const protobuf::LinkSet& pls = pbo->links(i); LinkSet& ls = obj->_links._links[pls.type()]; for (int j=0; j<pls.targets_size(); j++) { ls.insert(pls.targets(j)); } } //return std::make_pair(getDataType(), obj); return std::make_pair("ipaaca::RemoteMessage", obj); break; } default: // other cases not handled yet! ( TODO ) throw NotImplementedError(); } } //}}} // MessageConverter//{{{ IPAACA_EXPORT MessageConverter::MessageConverter() : Converter<std::string> (IPAACA_SYSTEM_DEPENDENT_CLASS_NAME("ipaaca::Message"), "ipaaca-messageiu", true) { } IPAACA_EXPORT std::string MessageConverter::serialize(const AnnotatedData& data, std::string& wire) { // Ensure that DATA actually holds a datum of the data-type we expect. assert(data.first == getDataType()); // "ipaaca::Message" // NOTE: a dynamic_pointer_cast cannot be used from void* boost::shared_ptr<const Message> obj = boost::static_pointer_cast<const Message> (data.second); boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU()); // transfer obj data to pbo pbo->set_uid(obj->uid()); pbo->set_revision(obj->revision()); pbo->set_category(obj->category()); pbo->set_payload_type(obj->payload_type()); pbo->set_owner_name(obj->owner_name()); pbo->set_committed(obj->committed()); ipaaca::protobuf::IU_AccessMode a_m; switch(obj->access_mode()) { case IU_ACCESS_PUSH: a_m = ipaaca::protobuf::IU_AccessMode_PUSH; break; case IU_ACCESS_REMOTE: a_m = ipaaca::protobuf::IU_AccessMode_REMOTE; break; case IU_ACCESS_MESSAGE: a_m = ipaaca::protobuf::IU_AccessMode_MESSAGE; break; } pbo->set_access_mode(a_m); pbo->set_read_only(obj->read_only()); for (std::map<std::string, std::string>::const_iterator it=obj->_payload._store.begin(); it!=obj->_payload._store.end(); ++it) { protobuf::PayloadItem* item = pbo->add_payload(); item->set_key(it->first); item->set_value(it->second); item->set_type("str"); // FIXME other types than str (later) } for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) { protobuf::LinkSet* links = pbo->add_links(); links->set_type(it->first); for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) { links->add_targets(*it2); } } pbo->SerializeToString(&wire); switch(obj->access_mode()) { case IU_ACCESS_PUSH: return "ipaaca-iu"; case IU_ACCESS_MESSAGE: return "ipaaca-messageiu"; default: //std::cout << "Requesting to send as default" << std::endl; return getWireSchema(); } } IPAACA_EXPORT AnnotatedData MessageConverter::deserialize(const std::string& wireSchema, const std::string& wire) { assert(wireSchema == getWireSchema()); // "ipaaca-iu" boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU()); pbo->ParseFromString(wire); IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode()); switch(mode) { case IU_ACCESS_PUSH: { // Create a "remote push IU" boost::shared_ptr<RemotePushIU> obj = RemotePushIU::create(); // transfer pbo data to obj obj->_uid = pbo->uid(); obj->_revision = pbo->revision(); obj->_category = pbo->category(); obj->_payload_type = pbo->payload_type(); obj->_owner_name = pbo->owner_name(); obj->_committed = pbo->committed(); obj->_read_only = pbo->read_only(); obj->_access_mode = IU_ACCESS_PUSH; for (int i=0; i<pbo->payload_size(); i++) { const protobuf::PayloadItem& it = pbo->payload(i); obj->_payload._store[it.key()] = it.value(); } for (int i=0; i<pbo->links_size(); i++) { const protobuf::LinkSet& pls = pbo->links(i); LinkSet& ls = obj->_links._links[pls.type()]; for (int j=0; j<pls.targets_size(); j++) { ls.insert(pls.targets(j)); } } //return std::make_pair(getDataType(), obj); return std::make_pair("ipaaca::RemotePushIU", obj); break; } case IU_ACCESS_MESSAGE: { // Create a "Message-type IU" boost::shared_ptr<RemoteMessage> obj = RemoteMessage::create(); // transfer pbo data to obj obj->_uid = pbo->uid(); obj->_revision = pbo->revision(); obj->_category = pbo->category(); obj->_payload_type = pbo->payload_type(); obj->_owner_name = pbo->owner_name(); obj->_committed = pbo->committed(); obj->_read_only = pbo->read_only(); obj->_access_mode = IU_ACCESS_MESSAGE; for (int i=0; i<pbo->payload_size(); i++) { const protobuf::PayloadItem& it = pbo->payload(i); obj->_payload._store[it.key()] = it.value(); } for (int i=0; i<pbo->links_size(); i++) { const protobuf::LinkSet& pls = pbo->links(i); LinkSet& ls = obj->_links._links[pls.type()]; for (int j=0; j<pls.targets_size(); j++) { ls.insert(pls.targets(j)); } } //return std::make_pair(getDataType(), obj); return std::make_pair("ipaaca::RemoteMessage", obj); break; } default: // other cases not handled yet! ( TODO ) throw NotImplementedError(); } } //}}} // IUPayloadUpdateConverter//{{{ IPAACA_EXPORT IUPayloadUpdateConverter::IUPayloadUpdateConverter() : Converter<std::string> (IPAACA_SYSTEM_DEPENDENT_CLASS_NAME("ipaaca::IUPayloadUpdate"), "ipaaca-iu-payload-update", true) { } IPAACA_EXPORT std::string IUPayloadUpdateConverter::serialize(const AnnotatedData& data, std::string& wire) { assert(data.first == getDataType()); // "ipaaca::IUPayloadUpdate" boost::shared_ptr<const IUPayloadUpdate> obj = boost::static_pointer_cast<const IUPayloadUpdate> (data.second); boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate()); // transfer obj data to pbo pbo->set_uid(obj->uid); pbo->set_revision(obj->revision); pbo->set_writer_name(obj->writer_name); pbo->set_is_delta(obj->is_delta); for (std::map<std::string, std::string>::const_iterator it=obj->new_items.begin(); it!=obj->new_items.end(); ++it) { protobuf::PayloadItem* item = pbo->add_new_items(); item->set_key(it->first); item->set_value(it->second); item->set_type("str"); // FIXME other types than str (later) } for (std::vector<std::string>::const_iterator it=obj->keys_to_remove.begin(); it!=obj->keys_to_remove.end(); ++it) { pbo->add_keys_to_remove(*it); } pbo->SerializeToString(&wire); return getWireSchema(); } AnnotatedData IUPayloadUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) { assert(wireSchema == getWireSchema()); // "ipaaca-iu-payload-update" boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate()); pbo->ParseFromString(wire); boost::shared_ptr<IUPayloadUpdate> obj(new IUPayloadUpdate()); // transfer pbo data to obj obj->uid = pbo->uid(); obj->revision = pbo->revision(); obj->writer_name = pbo->writer_name(); obj->is_delta = pbo->is_delta(); for (int i=0; i<pbo->new_items_size(); i++) { const protobuf::PayloadItem& it = pbo->new_items(i); obj->new_items[it.key()] = it.value(); } for (int i=0; i<pbo->keys_to_remove_size(); i++) { obj->keys_to_remove.push_back(pbo->keys_to_remove(i)); } return std::make_pair(getDataType(), obj); } //}}} // IULinkUpdateConverter//{{{ IPAACA_EXPORT IULinkUpdateConverter::IULinkUpdateConverter() : Converter<std::string> (IPAACA_SYSTEM_DEPENDENT_CLASS_NAME("ipaaca::IULinkUpdate"), "ipaaca-iu-link-update", true) { } IPAACA_EXPORT std::string IULinkUpdateConverter::serialize(const AnnotatedData& data, std::string& wire) { assert(data.first == getDataType()); // "ipaaca::IULinkUpdate" boost::shared_ptr<const IULinkUpdate> obj = boost::static_pointer_cast<const IULinkUpdate> (data.second); boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate()); // transfer obj data to pbo pbo->set_uid(obj->uid); pbo->set_revision(obj->revision); pbo->set_writer_name(obj->writer_name); pbo->set_is_delta(obj->is_delta); for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->new_links.begin(); it!=obj->new_links.end(); ++it) { protobuf::LinkSet* links = pbo->add_new_links(); links->set_type(it->first); for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) { links->add_targets(*it2); } } for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->links_to_remove.begin(); it!=obj->links_to_remove.end(); ++it) { protobuf::LinkSet* links = pbo->add_links_to_remove(); links->set_type(it->first); for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) { links->add_targets(*it2); } } pbo->SerializeToString(&wire); return getWireSchema(); } AnnotatedData IULinkUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) { assert(wireSchema == getWireSchema()); // "ipaaca-iu-link-update" boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate()); pbo->ParseFromString(wire); boost::shared_ptr<IULinkUpdate> obj(new IULinkUpdate()); // transfer pbo data to obj obj->uid = pbo->uid(); obj->revision = pbo->revision(); obj->writer_name = pbo->writer_name(); obj->is_delta = pbo->is_delta(); for (int i=0; i<pbo->new_links_size(); ++i) { const protobuf::LinkSet& it = pbo->new_links(i); for (int j=0; j<it.targets_size(); ++j) { obj->new_links[it.type()].insert(it.targets(j)); // = vec; } } for (int i=0; i<pbo->links_to_remove_size(); ++i) { const protobuf::LinkSet& it = pbo->links_to_remove(i); for (int j=0; j<it.targets_size(); ++j) { obj->links_to_remove[it.type()].insert(it.targets(j)); } } return std::make_pair(getDataType(), obj); } //}}} // IntConverter//{{{ IPAACA_EXPORT IntConverter::IntConverter() : Converter<std::string> ("int", "int32", true) { } IPAACA_EXPORT std::string IntConverter::serialize(const AnnotatedData& data, std::string& wire) { // Ensure that DATA actually holds a datum of the data-type we expect. assert(data.first == getDataType()); // "int" // NOTE: a dynamic_pointer_cast cannot be used from void* boost::shared_ptr<const int> obj = boost::static_pointer_cast<const int> (data.second); boost::shared_ptr<protobuf::IntMessage> pbo(new protobuf::IntMessage()); // transfer obj data to pbo pbo->set_value(*obj); pbo->SerializeToString(&wire); return getWireSchema(); } IPAACA_EXPORT AnnotatedData IntConverter::deserialize(const std::string& wireSchema, const std::string& wire) { assert(wireSchema == getWireSchema()); // "int" boost::shared_ptr<protobuf::IntMessage> pbo(new protobuf::IntMessage()); pbo->ParseFromString(wire); boost::shared_ptr<int> obj = boost::shared_ptr<int>(new int(pbo->value())); return std::make_pair("int", obj); } //}}} } // of namespace ipaaca