diff --git a/cpp/src/.gitignore b/cpp/src/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..05c3e637fa52c0fb86ceb4bcc7022728da9210b5 --- /dev/null +++ b/cpp/src/.gitignore @@ -0,0 +1,3 @@ +ipaaca.pb.cc +ipaaca.pb.h + diff --git a/cpp/src/Makefile b/cpp/src/Makefile index cb0a88d55588a8af3e8ef2a773e4205107149679..45ba15946eff401b5ad8cbfae12a09f78153baa0 100644 --- a/cpp/src/Makefile +++ b/cpp/src/Makefile @@ -1,18 +1,40 @@ +ifeq ($(WBS_ARCH),mac) + LIB_SUFFIX=.dylib +else + LIB_SUFFIX=.so +endif + CONFIG = -DIPAACA_DEBUG_MESSAGES -SOURCES = ipaaca.cc ipaaca.pb.cc ipaaca-test-main.cc +IPAACASOURCES = ipaaca.cc ipaaca.pb.cc +SOURCES = ${IPAACASOURCES} ipaaca-test-main.cc +TEXTSOURCES = ${IPAACASOURCES} textsender.cc CCFLAGS=-I. -I/usr/local/include -I/opt/local/include ${CONFIG} +LIBFLAGS=-fPIC -shared BOOSTLIBS = -L/opt/local/lib -lboost_regex-mt -lboost_date_time-mt -lboost_program_options-mt -lboost_thread-mt -lboost_filesystem-mt -lboost_signals-mt -lboost_system-mt PROTOLIBS = -L/opt/local/lib -lprotobuf LIBS = ${BOOSTLIBS} ${PROTOLIBS} -L/usr/local/lib -lrsc -lrsbcore -all: protoc - #g++ ${CCFLAGS} -o ipaaca-test ${SOURCES} ${LIBS} - g++ ${CCFLAGS} -DMAKE_RECEIVER -o ipaaca-receiver ${SOURCES} ${LIBS} - g++ ${CCFLAGS} -DMAKE_SENDER -o ipaaca-sender ${SOURCES} ${LIBS} + +COMPILER = gfilt + +all: lib + + +lib: + ${COMPILER} ${CCFLAGS} ${IPAACASOURCES} ${LIBS} ${LIBFLAGS} -o libipaaca${LIB_SUFFIX} + +receiver: + ${COMPILER} ${CCFLAGS} -DMAKE_RECEIVER -o ipaaca-receiver ${SOURCES} ${LIBS} + +sender: + ${COMPILER} ${CCFLAGS} -DMAKE_SENDER -o ipaaca-sender ${SOURCES} ${LIBS} + +main: + ${COMPILER} ${CCFLAGS} -o ipaaca-main ${SOURCES} ${LIBS} protoc: protoc --proto_path=../../proto ../../proto/ipaaca.proto --cpp_out=. clean: - rm -f ipaaca-test ipaaca-sender ipaaca-receiver ipaaca.pb.h ipaaca.pb.cc + rm -f libipaaca${LIB_SUFFIX} ipaaca-main ipaaca-sender ipaaca-receiver ipaaca.pb.h ipaaca.pb.cc diff --git a/cpp/src/ipaaca-test-main.cc b/cpp/src/ipaaca-test-main.cc index 41ddd232ac3227f09ba1b687b47e2c9904bc4531..cefa166dfd5b3820915f3d6da1cd13860aadfb11 100644 --- a/cpp/src/ipaaca-test-main.cc +++ b/cpp/src/ipaaca-test-main.cc @@ -5,105 +5,76 @@ //#include <rsc/logging/LoggerFactory.h> // //rsc::logging::LoggerFactory::getInstance().reconfigure(rsc::logging::Logger::LEVEL_ALL); -#ifdef MAKE_RECEIVER -//boost::mutex mtx; -using namespace ipaaca; +// +// TESTS +// -class Testo { - protected: - std::string _name; - public: - inline Testo(const std::string& name="Testo"): _name(name) { } - inline void handleIUEvent(EventPtr event) - { - std::cout << _name << " received a "; - std::string type = event->getType(); - if (type == "ipaaca::IUPayloadUpdate") { - std::cout << *boost::static_pointer_cast<IUPayloadUpdate>(event->getData()) << std::endl; - } else if (type == "ipaaca::IULinkUpdate") { - std::cout << *boost::static_pointer_cast<IULinkUpdate>(event->getData()) << std::endl; - } else { - std::cout << type << " (Unhandled type!)" << std::endl; - } - } -}; +using namespace ipaaca; +#ifdef MAKE_RECEIVER +void my_first_iu_handler(IUInterface::ptr iu, IUEventType type, bool local) +{ + std::cout << "[32m" << iu_event_type_to_str(type) << " " << (local?"(of local IU)":"(of remote IU)") << "[m" << std::endl; + if (type == IU_LINKSUPDATED) { + std::cout << " setting something in the remote payload" << std::endl; + iu->payload()["new_field"] = "remotely_set"; + } +} int main() { - initialize_ipaaca_rsb(); - - ListenerPtr listener = Factory::getInstance().createListener( Scope("/tutorial/converter")); - - Testo t("TESTO"); - HandlerPtr event_handler = HandlerPtr(new EventFunctionHandler(boost::bind(&Testo::handleIUEvent, boost::ref(t), _1))); - listener->addHandler( event_handler ); - - while(true) { - boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + try{ + //initialize_ipaaca_rsb(); + + InputBuffer::ptr ib = InputBuffer::create("Tester", "testcategory"); + ib->register_handler(my_first_iu_handler); + + while (true) { + sleep(1); + } + + } catch (ipaaca::Exception& e) { + std::cout << "== IPAACA EXCEPTION == " << e.what() << std::endl; } - return EXIT_SUCCESS; } #else #ifdef MAKE_SENDER -using namespace ipaaca; -int main() { - initialize_ipaaca_rsb(); - - //Informer<ipaaca::IUPayloadUpdate>::Ptr pinformer = Factory::getInstance().createInformer<ipaaca::IUPayloadUpdate> ( Scope("/tutorial/converter")); - //Informer<ipaaca::IULinkUpdate>::Ptr linformer = Factory::getInstance().createInformer<ipaaca::IULinkUpdate> ( Scope("/tutorial/converter")); - - Informer<AnyType>::Ptr informer = Factory::getInstance().createInformer<AnyType> ( Scope("/tutorial/converter")); - - IUPayloadUpdate* pup = new ipaaca::IUPayloadUpdate(); - Informer<ipaaca::IUPayloadUpdate>::DataPtr pdata(pup); - pup->uid = "2000"; - pup->revision = 3; - pup->writer_name = "Comp1_OB"; - pup->is_delta = true; - pup->new_items["new_key"] = "new_value"; - pup->new_items["another_key"] = "some_info"; - pup->keys_to_remove.push_back("old_key"); - informer->publish(pdata); - - IULinkUpdate* lup = new ipaaca::IULinkUpdate(); - Informer<ipaaca::IULinkUpdate>::DataPtr ldata(lup); - lup->uid = "2001"; - lup->revision = 4; - lup->writer_name = "Comp2_IB"; - lup->is_delta = true; - lup->new_links["SLL"].insert("2000"); - lup->new_links["grin"].insert("1002"); - lup->links_to_remove["grin"].insert("1001"); - informer->publish(ldata); - - - std::cout << "Done." << std::endl; - return EXIT_SUCCESS; +void iu_handler_for_remote_changes(IUInterface::ptr iu, IUEventType type, bool local) +{ + std::cout << "[32m" << iu_event_type_to_str(type) << " " << (local?"(of local IU)":"(of remote IU)") << "[m" << std::endl; } -#else - -// -// TESTS -// - -using namespace ipaaca; - int main() { - //try{ - initialize_ipaaca_rsb(); - IU::ref iu = IU::create(); - std::cout << "payload.get(\"TEST\") = \"" << iu->payload.get("TEST") << "\"" << std::endl; - std::cout << "payload[\"TEST\"] = \"" << (std::string) iu->payload["TEST"] << "\"" << std::endl; - iu->payload["TEST"] = "123.5-WAS-SET"; - std::cout << "payload[\"TEST\"] = \"" << (std::string) iu->payload["TEST"] << "\"" << std::endl; + try{ + //initialize_ipaaca_rsb(); + - std::string s = "The string \"" + iu->payload["TEST"].to_str() + "\" is the new value."; + OutputBuffer::ptr ob = OutputBuffer::create("Tester"); + ob->register_handler(iu_handler_for_remote_changes); + //std::cout << "Buffer: " << ob->unique_name() << std::endl; + + IU::ptr iu = IU::create("testcategory"); + ob->add(iu); + + std::cout << "Updating in 1 sec" << std::endl; + sleep(1); + + std::cout << "_payload.get(\"TEST\") = \"" << iu->_payload.get("TEST") << "\"" << std::endl; + std::cout << "_payload[\"TEST\"] = \"" << (std::string) iu->_payload["TEST"] << "\"" << std::endl; + iu->_payload["TEST"] = "123.5-WAS-SET"; + std::cout << "_payload[\"TEST\"] = \"" << (std::string) iu->_payload["TEST"] << "\"" << std::endl; + + std::string s = "The string \"" + iu->_payload["TEST"].to_str() + "\" is the new value."; std::cout << "Concatenation test: " << s << std::endl; - - std::cout << "Interpreted as long value: " << iu->payload["TEST"].to_int() << std::endl; - std::cout << "Interpreted as double value: " << iu->payload["TEST"].to_float() << std::endl; - //} catch (std::exception& e) { - // std::cout << e.what() << std::endl; - //} + + iu->add_link("grin", "DUMMY_IU_UID_1234efef1234"); + + std::cout << "Interpreted as long value: " << iu->_payload["TEST"].to_int() << std::endl; + std::cout << "Interpreted as double value: " << iu->_payload["TEST"].to_float() << std::endl; + + std::cout << "Committing and quitting in 1 sec" << std::endl; + sleep(1); + iu->commit(); + } catch (ipaaca::Exception& e) { + std::cout << "== IPAACA EXCEPTION == " << e.what() << std::endl; + } } #endif diff --git a/cpp/src/ipaaca.cc b/cpp/src/ipaaca.cc index 243e0faf83fc94bbb67ef4c09c3e755b322f0c64..8bf68748863437d98325fc24e9fbe377256a3f52 100644 --- a/cpp/src/ipaaca.cc +++ b/cpp/src/ipaaca.cc @@ -2,23 +2,107 @@ #include <cstdlib> namespace ipaaca { -/* -*/ -void initialize_ipaaca_rsb() + +// util and init//{{{ + +bool Initializer::_initialized = false; + +//const LinkSet EMPTY_LINK_SET = LinkSet(); +//const std::set<std::string> EMPTY_LINK_SET(); +bool Initializer::initialized() { return _initialized; } +void Initializer::initialize_ipaaca_rsb_if_needed() { + if (_initialized) return; ParticipantConfig config = ParticipantConfig::fromConfiguration(); Factory::getInstance().setDefaultParticipantConfig(config); + boost::shared_ptr<IUConverter> iu_converter(new IUConverter()); + stringConverterRepository()->registerConverter(iu_converter); + boost::shared_ptr<IUPayloadUpdateConverter> payload_update_converter(new IUPayloadUpdateConverter()); - boost::shared_ptr<IULinkUpdateConverter> link_update_converter(new IULinkUpdateConverter()); stringConverterRepository()->registerConverter(payload_update_converter); + + boost::shared_ptr<IULinkUpdateConverter> link_update_converter(new IULinkUpdateConverter()); stringConverterRepository()->registerConverter(link_update_converter); + boost::shared_ptr<ProtocolBufferConverter<protobuf::IUCommission> > iu_commission_converter(new ProtocolBufferConverter<protobuf::IUCommission> ()); + stringConverterRepository()->registerConverter(iu_commission_converter); + + boost::shared_ptr<IntConverter> int_converter(new IntConverter()); + stringConverterRepository()->registerConverter(int_converter); + + _initialized = true; //IPAACA_TODO("initialize all converters") } -std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj) +std::string generate_uuid_string() +{ + uuid_t uuidt; + uuid_string_t uuidstr; + uuid_generate(uuidt); + uuid_unparse_lower(uuidt, uuidstr); + return uuidstr; +} + +/* +void init_inprocess_too() { + //ParticipantConfig config = Factory::getInstance().getDefaultParticipantConfig(); + ParticipantConfig config = ParticipantConfig::fromFile("rsb.cfg"); + //ParticipantConfig::Transport inprocess = config.getTransport("inprocess"); + //inprocess.setEnabled(true); + //config.addTransport(inprocess); + Factory::getInstance().setDefaultParticipantConfig(config); +} +*/ +//}}} + +std::ostream& operator<<(std::ostream& os, const SmartLinkMap& obj)//{{{ +{ + os << "{"; + bool first = true; + for (LinkMap::const_iterator it=obj._links.begin(); it!=obj._links.end(); ++it) { + if (first) { first=false; } else { os << ", "; } + os << "'" << it->first << "': ["; + bool firstinner = true; + for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) { + if (firstinner) { firstinner=false; } else { os << ", "; } + os << "'" << *it2 << "'"; + } + os << "]"; + } + os << "}"; + return os; +} +//}}} +std::ostream& operator<<(std::ostream& os, const Payload& obj)//{{{ +{ + os << "{"; + bool first = true; + for (std::map<std::string, std::string>::const_iterator it=obj._store.begin(); it!=obj._store.end(); ++it) { + if (first) { first=false; } else { os << ", "; } + os << "'" << it->first << "':'" << it->second << "'"; + } + os << "}"; + return os; +} +//}}} +std::ostream& operator<<(std::ostream& os, const IUInterface& obj)//{{{ +{ + os << "IUInterface(uid='" << obj.uid() << "'"; + os << ", category='" << obj.category() << "'"; + os << ", revision=" << obj.revision(); + os << ", committed=" << (obj.committed()?"True":"False"); + os << ", owner_name='" << obj.owner_name() << "'"; + os << ", payload="; + os << obj.const_payload(); + os << ", links="; + os << obj._links; + os << ")"; + return os; +} +//}}} +std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj)//{{{ { os << "PayloadUpdate(uid=" << obj.uid << ", revision=" << obj.revision; os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False"); @@ -37,8 +121,8 @@ std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj) os << "])"; return os; } - -std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj) +//}}} +std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj)//{{{ { os << "LinkUpdate(uid=" << obj.uid << ", revision=" << obj.revision; os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False"); @@ -69,25 +153,793 @@ std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj) os << "})"; return os; } +//}}} +// SmartLinkMap//{{{ -/* -void init_inprocess_too() { - //ParticipantConfig config = Factory::getInstance().getDefaultParticipantConfig(); - ParticipantConfig config = ParticipantConfig::fromFile("rsb.cfg"); - //ParticipantConfig::Transport inprocess = config.getTransport("inprocess"); - //inprocess.setEnabled(true); - //config.addTransport(inprocess); - Factory::getInstance().setDefaultParticipantConfig(config); +LinkSet SmartLinkMap::empty_link_set; +void SmartLinkMap::_add_and_remove_links(const LinkMap& add, const LinkMap& remove) +{ + // remove specified links + for (LinkMap::const_iterator it = remove.begin(); it != remove.end(); ++it ) { + // if link type exists + if (_links.count(it->first) > 0) { + // remove one by one + for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) { + _links[it->first].erase(*it2); + } + // wipe the type key if no more links are left + if (_links[it->first].size() == 0) { + _links.erase(it->first); + } + } + } + // add specified links + for (LinkMap::const_iterator it = add.begin(); it != add.end(); ++it ) { + for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) { + _links[it->first].insert(*it2); + } + } } -*/ +void SmartLinkMap::_replace_links(const LinkMap& links) +{ + //_links.clear(); + _links=links; +} +const LinkSet& SmartLinkMap::get_links(const std::string& key) +{ + LinkMap::const_iterator it = _links.find(key); + if (it==_links.end()) return empty_link_set; + return it->second; +} +const LinkMap& SmartLinkMap::get_all_links() +{ + return _links; +} +//}}} + +// IUEventHandler//{{{ +IUEventHandler::IUEventHandler(IUEventHandlerFunction function, IUEventType event_mask, const std::string& category) +: _function(function), _event_mask(event_mask), _for_all_categories(false) +{ + if (category=="") { + _for_all_categories = true; + } else { + _categories.insert(category); + } +} +IUEventHandler::IUEventHandler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories) +: _function(function), _event_mask(event_mask), _for_all_categories(false) +{ + if (categories.size()==0) { + _for_all_categories = true; + } else { + _categories = categories; + } +} +void IUEventHandler::call(Buffer* buffer, boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category) +{ + if (_condition_met(event_type, category)) { + //IUInterface::ptr iu = buffer->get(uid); + //if (iu) { + _function(iu, event_type, local); + //} + } +} +//}}} -IU::ref IU::create(/* params */) +// Buffer//{{{ +void Buffer::_allocate_unique_name(const std::string& basename, const std::string& function) { + std::string uuid = ipaaca::generate_uuid_string(); + _basename = basename; + _uuid = uuid.substr(0,8); + _unique_name = "/ipaaca/component/" + _basename + "ID" + _uuid + "/" + function; +} +void Buffer::register_handler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories) +{ + IUEventHandler::ptr handler = IUEventHandler::ptr(new IUEventHandler(function, event_mask, categories)); + _event_handlers.push_back(handler); +} +void Buffer::register_handler(IUEventHandlerFunction function, IUEventType event_mask, const std::string& category) { - IU::ref iu = IU::ref(new IU(/* params */)); - iu->payload.initialize(iu); + IUEventHandler::ptr handler = IUEventHandler::ptr(new IUEventHandler(function, event_mask, category)); + _event_handlers.push_back(handler); +} +void Buffer::call_iu_event_handlers(boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category) +{ + //IPAACA_INFO("handling an event " << ipaaca::iu_event_type_to_str(event_type) << " for IU " << iu->uid()) + for (std::vector<IUEventHandler::ptr>::iterator it = _event_handlers.begin(); it != _event_handlers.end(); ++it) { + (*it)->call(this, iu, local, event_type, category); + } +} +//}}} + +// Callbacks for OutputBuffer//{{{ +CallbackIUPayloadUpdate::CallbackIUPayloadUpdate(Buffer* buffer): _buffer(buffer) { } +CallbackIULinkUpdate::CallbackIULinkUpdate(Buffer* buffer): _buffer(buffer) { } +CallbackIUCommission::CallbackIUCommission(Buffer* buffer): _buffer(buffer) { } + +boost::shared_ptr<int> CallbackIUPayloadUpdate::call(const std::string& methodName, boost::shared_ptr<IUPayloadUpdate> update) +{ + IUInterface::ptr iui = _buffer->get(update->uid); + if (! iui) { + IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid) + return boost::shared_ptr<int>(new int(0)); + } + IU::ptr iu = boost::static_pointer_cast<IU>(iui); + iu->_revision_lock.lock(); + if ((update->revision != 0) && (update->revision != iu->_revision)) { + IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid) + iu->_revision_lock.unlock(); + return boost::shared_ptr<int>(new int(0)); + } + if (update->is_delta) { + for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) { + iu->payload()._internal_remove(*it, update->writer_name); //_buffer->unique_name()); + } + for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) { + iu->payload()._internal_set(it->first, it->second, update->writer_name); //_buffer->unique_name()); + } + } else { + iu->payload()._internal_replace_all(update->new_items, update->writer_name); //_buffer->unique_name()); + } + _buffer->call_iu_event_handlers(iu, true, IU_UPDATED, iu->category()); + revision_t revision = iu->revision(); + iu->_revision_lock.unlock(); + return boost::shared_ptr<int>(new int(revision)); +} + +boost::shared_ptr<int> CallbackIULinkUpdate::call(const std::string& methodName, boost::shared_ptr<IULinkUpdate> update) +{ + IUInterface::ptr iui = _buffer->get(update->uid); + if (! iui) { + IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid) + return boost::shared_ptr<int>(new int(0)); + } + IU::ptr iu = boost::static_pointer_cast<IU>(iui); + iu->_revision_lock.lock(); + if ((update->revision != 0) && (update->revision != iu->_revision)) { + IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid) + iu->_revision_lock.unlock(); + return boost::shared_ptr<int>(new int(0)); + } + if (update->is_delta) { + iu->modify_links(update->new_links, update->links_to_remove, update->writer_name); + } else { + iu->set_links(update->new_links, update->writer_name); + } + _buffer->call_iu_event_handlers(iu, true, IU_LINKSUPDATED, iu->category()); + revision_t revision = iu->revision(); + iu->_revision_lock.unlock(); + return boost::shared_ptr<int>(new int(revision)); +} +boost::shared_ptr<int> CallbackIUCommission::call(const std::string& methodName, boost::shared_ptr<protobuf::IUCommission> update) +{ + IUInterface::ptr iui = _buffer->get(update->uid()); + if (! iui) { + IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid()) + return boost::shared_ptr<int>(new int(0)); + } + IU::ptr iu = boost::static_pointer_cast<IU>(iui); + iu->_revision_lock.lock(); + if ((update->revision() != 0) && (update->revision() != iu->_revision)) { + IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid()) + iu->_revision_lock.unlock(); + return boost::shared_ptr<int>(new int(0)); + } + if (iu->committed()) { + return boost::shared_ptr<int>(new int(0)); + } else { + } + iu->_internal_commit(update->writer_name()); + _buffer->call_iu_event_handlers(iu, true, IU_LINKSUPDATED, iu->category()); + revision_t revision = iu->revision(); + iu->_revision_lock.unlock(); + return boost::shared_ptr<int>(new int(revision)); +} + +//}}} + +// OutputBuffer//{{{ + +OutputBuffer::OutputBuffer(const std::string& basename) +:Buffer(basename, "OB") +{ + _id_prefix = _basename + "-" + _uuid + "-IU-"; + _initialize_server(); +} +void OutputBuffer::_initialize_server() +{ + _server = Factory::getInstance().createServer( Scope( _unique_name ) ); + _server->registerMethod("updatePayload", Server::CallbackPtr(new CallbackIUPayloadUpdate(this))); + _server->registerMethod("updateLinks", Server::CallbackPtr(new CallbackIULinkUpdate(this))); + _server->registerMethod("commit", Server::CallbackPtr(new CallbackIUCommission(this))); +} +OutputBuffer::ptr OutputBuffer::create(const std::string& basename) +{ + Initializer::initialize_ipaaca_rsb_if_needed(); + return OutputBuffer::ptr(new OutputBuffer(basename)); +} +IUInterface::ptr OutputBuffer::get(const std::string& iu_uid) +{ + IUStore::iterator it = _iu_store.find(iu_uid); + if (it==_iu_store.end()) return IUInterface::ptr(); + return it->second; +} +std::set<IUInterface::ptr> OutputBuffer::get_ius() +{ + std::set<IUInterface::ptr> set; + for (IUStore::iterator it=_iu_store.begin(); it!=_iu_store.end(); ++it) set.insert(it->second); + return set; +} + +void OutputBuffer::_send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) +{ + IULinkUpdate* lup = new ipaaca::IULinkUpdate(); + Informer<ipaaca::IULinkUpdate>::DataPtr ldata(lup); + lup->uid = iu->uid(); + lup->is_delta = is_delta; + lup->revision = revision; + lup->is_delta = true; + lup->new_links = new_links; + if (is_delta) lup->links_to_remove = links_to_remove; + if (writer_name=="") lup->writer_name = _unique_name; + else lup->writer_name = writer_name; + Informer<AnyType>::Ptr informer = _get_informer(iu->category()); + informer->publish(ldata); +} + +void OutputBuffer::_send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) +{ + IUPayloadUpdate* pup = new ipaaca::IUPayloadUpdate(); + Informer<ipaaca::IUPayloadUpdate>::DataPtr pdata(pup); + pup->uid = iu->uid(); + pup->is_delta = is_delta; + pup->revision = revision; + pup->new_items = new_items; + if (is_delta) pup->keys_to_remove = keys_to_remove; + if (writer_name=="") pup->writer_name = _unique_name; + else pup->writer_name = writer_name; + Informer<AnyType>::Ptr informer = _get_informer(iu->category()); + informer->publish(pdata); +} + +void OutputBuffer::_send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name) +{ + Informer<protobuf::IUCommission>::DataPtr data(new protobuf::IUCommission()); + data->set_uid(iu->uid()); + data->set_revision(revision); + if (writer_name=="") data->set_writer_name(_unique_name); + else data->set_writer_name(writer_name); + + Informer<AnyType>::Ptr informer = _get_informer(iu->category()); + informer->publish(data); +} + +void OutputBuffer::add(IU::ptr iu) +{ + if (_iu_store.count(iu->uid()) > 0) { + throw IUPublishedError(); + } + _iu_store[iu->uid()] = iu; + iu->_associate_with_buffer(this); //shared_from_this()); + _publish_iu(iu); +} + +void OutputBuffer::_publish_iu(IU::ptr iu) +{ + Informer<AnyType>::Ptr informer = _get_informer(iu->_category); + Informer<ipaaca::IU>::DataPtr iu_data(iu); + informer->publish(iu_data); +} + +Informer<AnyType>::Ptr OutputBuffer::_get_informer(const std::string& category) +{ + if (_informer_store.count(category) > 0) { + return _informer_store[category]; + } else { + //IPAACA_INFO("Making new informer for category " << category) + std::string scope_string = "/ipaaca/category/" + category; + Informer<AnyType>::Ptr informer = Factory::getInstance().createInformer<AnyType> ( Scope(scope_string)); + _informer_store[category] = informer; + return informer; + } +} +boost::shared_ptr<IU> OutputBuffer::remove(const std::string& iu_uid) +{ + IUStore::iterator it = _iu_store.find(iu_uid); + if (it == _iu_store.end()) throw IUNotFoundError(); + IU::ptr iu = it->second; + _retract_iu(iu); + _iu_store.erase(iu_uid); return iu; } +boost::shared_ptr<IU> OutputBuffer::remove(IU::ptr iu) +{ + return remove(iu->uid()); // to make sure it is in the store +} + +void OutputBuffer::_retract_iu(IU::ptr iu) +{ + Informer<protobuf::IURetraction>::DataPtr data(new protobuf::IURetraction()); + data->set_uid(iu->uid()); + data->set_revision(iu->revision()); + Informer<AnyType>::Ptr informer = _get_informer(iu->category()); + informer->publish(data); +} + + +//}}} + +// InputBuffer//{{{ +InputBuffer::InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests) +:Buffer(basename, "IB") +{ + for (std::vector<std::string>::const_iterator it=category_interests.begin(); it!=category_interests.end(); ++it) { + _create_category_listener_if_needed(*it); + } +} +InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1) +:Buffer(basename, "IB") +{ + _create_category_listener_if_needed(category_interest1); +} +InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2) +:Buffer(basename, "IB") +{ + _create_category_listener_if_needed(category_interest1); + _create_category_listener_if_needed(category_interest2); +} +InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3) +:Buffer(basename, "IB") +{ + _create_category_listener_if_needed(category_interest1); + _create_category_listener_if_needed(category_interest2); + _create_category_listener_if_needed(category_interest3); +} +InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4) +:Buffer(basename, "IB") +{ + _create_category_listener_if_needed(category_interest1); + _create_category_listener_if_needed(category_interest2); + _create_category_listener_if_needed(category_interest3); + _create_category_listener_if_needed(category_interest4); +} + + +InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::vector<std::string>& category_interests) +{ + Initializer::initialize_ipaaca_rsb_if_needed(); + return InputBuffer::ptr(new InputBuffer(basename, category_interests)); +} +InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1) +{ + Initializer::initialize_ipaaca_rsb_if_needed(); + return InputBuffer::ptr(new InputBuffer(basename, category_interest1)); +} +InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2) +{ + Initializer::initialize_ipaaca_rsb_if_needed(); + return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2)); +} +InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3) +{ + Initializer::initialize_ipaaca_rsb_if_needed(); + return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3)); +} +InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4) +{ + Initializer::initialize_ipaaca_rsb_if_needed(); + return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3, category_interest4)); +} + +IUInterface::ptr InputBuffer::get(const std::string& iu_uid) +{ + RemotePushIUStore::iterator it = _iu_store.find(iu_uid); // TODO genericize + if (it==_iu_store.end()) return IUInterface::ptr(); + return it->second; +} +std::set<IUInterface::ptr> InputBuffer::get_ius() +{ + std::set<IUInterface::ptr> set; + for (RemotePushIUStore::iterator it=_iu_store.begin(); it!=_iu_store.end(); ++it) set.insert(it->second); // TODO genericize + return set; +} + + +RemoteServerPtr InputBuffer::_get_remote_server(const std::string& unique_server_name) +{ + std::map<std::string, RemoteServerPtr>::iterator it = _remote_server_store.find(unique_server_name); + if (it!=_remote_server_store.end()) return it->second; + RemoteServerPtr remote_server = Factory::getInstance().createRemoteServer(Scope(unique_server_name)); + _remote_server_store[unique_server_name] = remote_server; + return remote_server; +} + +ListenerPtr InputBuffer::_create_category_listener_if_needed(const std::string& category) +{ + std::map<std::string, ListenerPtr>::iterator it = _listener_store.find(category); + if (it!=_listener_store.end()) return it->second; + //IPAACA_INFO("Creating a new listener for category " << category) + std::string scope_string = "/ipaaca/category/" + category; + ListenerPtr listener = Factory::getInstance().createListener( Scope(scope_string) ); + HandlerPtr event_handler = HandlerPtr( + new EventFunctionHandler( + boost::bind(&InputBuffer::_handle_iu_events, this, _1) + ) + ); + listener->addHandler(event_handler); + _listener_store[category] = listener; + return listener; + /* + '''Return (or create, store and return) a category listener.''' + if iu_category in self._listener_store: return self._informer_store[iu_category] + cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config) + cat_listener.addHandler(self._handle_iu_events) + self._listener_store[iu_category] = cat_listener + self._category_interests.append(iu_category) + logger.info("Added listener in scope "+"/ipaaca/category/"+iu_category) + return cat_listener + */ +} +void InputBuffer::_handle_iu_events(EventPtr event) +{ + std::string type = event->getType(); + if (type == "ipaaca::RemotePushIU") { + boost::shared_ptr<RemotePushIU> iu = boost::static_pointer_cast<RemotePushIU>(event->getData()); + if (_iu_store.count(iu->category()) > 0) { + // already got the IU... ignore + } else { + _iu_store[iu->uid()] = iu; + iu->_set_buffer(this); + call_iu_event_handlers(iu, false, IU_ADDED, iu->category() ); + } + //IPAACA_INFO( "New RemotePushIU state: " << (*iu) ) + } else { + RemotePushIUStore::iterator it; + if (type == "ipaaca::IUPayloadUpdate") { + boost::shared_ptr<IUPayloadUpdate> update = boost::static_pointer_cast<IUPayloadUpdate>(event->getData()); + //IPAACA_INFO("** writer name: " << update->writer_name) + if (update->writer_name == _unique_name) { + return; + } + it = _iu_store.find(update->uid); + if (it == _iu_store.end()) { + IPAACA_INFO("Ignoring UPDATED message for an IU that we did not fully receive before") + return; + } + // + it->second->_apply_update(update); + call_iu_event_handlers(it->second, false, IU_UPDATED, it->second->category() ); + // + // + } else if (type == "ipaaca::IULinkUpdate") { + boost::shared_ptr<IULinkUpdate> update = boost::static_pointer_cast<IULinkUpdate>(event->getData()); + if (update->writer_name == _unique_name) { + return; + } + it = _iu_store.find(update->uid); + if (it == _iu_store.end()) { + IPAACA_INFO("Ignoring LINKSUPDATED message for an IU that we did not fully receive before") + return; + } + // + it->second->_apply_link_update(update); + call_iu_event_handlers(it->second, false, IU_LINKSUPDATED, it->second->category() ); + // + // + } else if (type == "ipaaca::protobuf::IUCommission") { + boost::shared_ptr<protobuf::IUCommission> update = boost::static_pointer_cast<protobuf::IUCommission>(event->getData()); + if (update->writer_name() == _unique_name) { + return; + } + it = _iu_store.find(update->uid()); + if (it == _iu_store.end()) { + IPAACA_INFO("Ignoring COMMITTED message for an IU that we did not fully receive before") + return; + } + // + it->second->_apply_commission(); + it->second->_revision = update->revision(); + call_iu_event_handlers(it->second, false, IU_COMMITTED, it->second->category() ); + // + // + } else { + std::cout << "(Unhandled Event type " << type << " !)" << std::endl; + return; + } + //IPAACA_INFO( "New RemotePushIU state: " << *(it->second) ) + } +} + +//}}} + + + +// IUInterface//{{{ + +IUInterface::IUInterface() +: _buffer(NULL), _committed(false) +{ +} + +void IUInterface::_set_uid(const std::string& uid) { + if (_uid != "") { + throw IUAlreadyHasAnUIDError(); + } + _uid = uid; +} + +void IUInterface::_set_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) { + if (_buffer) { + throw IUAlreadyInABufferError(); + } + _buffer = buffer; + +} + +void IUInterface::_set_owner_name(const std::string& owner_name) { + if (_owner_name != "") { + throw IUAlreadyHasAnOwnerNameError(); + } + _owner_name = owner_name; +} + +/// set the buffer pointer and the owner names of IU and Payload +void IUInterface::_associate_with_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) { + _set_buffer(buffer); // will throw if already set + _set_owner_name(buffer->unique_name()); + payload()._set_owner_name(buffer->unique_name()); +} + +/// C++-specific convenience function to add one single link +void IUInterface::add_link(const std::string& type, const std::string& target, const std::string& writer_name) +{ + LinkMap none; + LinkMap add; + add[type].insert(target); + _modify_links(true, add, none, writer_name); + _add_and_remove_links(add, none); +} +/// C++-specific convenience function to remove one single link +void IUInterface::remove_link(const std::string& type, const std::string& target, const std::string& writer_name) +{ + LinkMap none; + LinkMap remove; + remove[type].insert(target); + _modify_links(true, none, remove, writer_name); + _add_and_remove_links(none, remove); +} + +void IUInterface::add_links(const std::string& type, const LinkSet& targets, const std::string& writer_name) +{ + LinkMap none; + LinkMap add; + add[type] = targets; + _modify_links(true, add, none, writer_name); + _add_and_remove_links(add, none); +} + +void IUInterface::remove_links(const std::string& type, const LinkSet& targets, const std::string& writer_name) +{ + LinkMap none; + LinkMap remove; + remove[type] = targets; + _modify_links(true, none, remove, writer_name); + _add_and_remove_links(none, remove); +} + +void IUInterface::modify_links(const LinkMap& add, const LinkMap& remove, const std::string& writer_name) +{ + _modify_links(true, add, remove, writer_name); + _add_and_remove_links(add, remove); +} + +void IUInterface::set_links(const LinkMap& links, const std::string& writer_name) +{ + LinkMap none; + _modify_links(false, links, none, writer_name); + _replace_links(links); +} + +//}}} + +// IU//{{{ +IU::ptr IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type) +{ + IU::ptr iu = IU::ptr(new IU(category, access_mode, read_only, payload_type)); /* params */ //)); + iu->_payload.initialize(iu); + return iu; +} + +IU::IU(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type) +{ + _revision = 1; + _uid = ipaaca::generate_uuid_string(); + _category = category; + _payload_type = payload_type; + // payload initialization deferred to IU::create(), above + _read_only = read_only; + _access_mode = access_mode; + _committed = false; +} + +void IU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) +{ + _revision_lock.lock(); + if (_committed) { + _revision_lock.unlock(); + throw IUCommittedError(); + } + _increase_revision_number(); + if (is_published()) { + _buffer->_send_iu_link_update(this, is_delta, _revision, new_links, links_to_remove, writer_name); + } + _revision_lock.unlock(); +} +void IU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) +{ + _revision_lock.lock(); + if (_committed) { + _revision_lock.unlock(); + throw IUCommittedError(); + } + _increase_revision_number(); + if (is_published()) { + _buffer->_send_iu_payload_update(this, is_delta, _revision, new_items, keys_to_remove, writer_name); + } + _revision_lock.unlock(); +} + +void IU::commit() +{ + _internal_commit(); +} + +void IU::_internal_commit(const std::string& writer_name) +{ + _revision_lock.lock(); + if (_committed) { + _revision_lock.unlock(); + throw IUCommittedError(); + } + _increase_revision_number(); + _committed = true; + if (is_published()) { + _buffer->_send_iu_commission(this, _revision, writer_name); + } + _revision_lock.unlock(); +} +//}}} + +// RemotePushIU//{{{ + +RemotePushIU::ptr RemotePushIU::create() +{ + RemotePushIU::ptr iu = RemotePushIU::ptr(new RemotePushIU(/* params */)); + iu->_payload.initialize(iu); + return iu; +} +RemotePushIU::RemotePushIU() +{ + // nothing +} +void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) +{ + if (_committed) { + throw IUCommittedError(); + } + if (_read_only) { + throw IUReadOnlyError(); + } + RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name); + IULinkUpdate::ptr update = IULinkUpdate::ptr(new IULinkUpdate()); + update->uid = _uid; + update->revision = _revision; + update->is_delta = is_delta; + update->writer_name = _buffer->unique_name(); + update->new_links = new_links; + update->links_to_remove = links_to_remove; + boost::shared_ptr<int> result = server->call<int>("updateLinks", update, 1); // TODO 1 sec + if (*result == 0) { + throw IUUpdateFailedError(); + } else { + _revision = *result; + } +} +void RemotePushIU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) +{ + if (_committed) { + throw IUCommittedError(); + } + if (_read_only) { + throw IUReadOnlyError(); + } + RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name); + IUPayloadUpdate::ptr update = IUPayloadUpdate::ptr(new IUPayloadUpdate()); + update->uid = _uid; + update->revision = _revision; + update->is_delta = is_delta; + update->writer_name = _buffer->unique_name(); + update->new_items = new_items; + update->keys_to_remove = keys_to_remove; + boost::shared_ptr<int> result = server->call<int>("updatePayload", update, 1); // TODO 1 sec + if (*result == 0) { + throw IUUpdateFailedError(); + } else { + _revision = *result; + } +} + +void RemotePushIU::commit() +{ + if (_read_only) { + throw IUReadOnlyError(); + } + if (_committed) { + // Following python version: ignoring multiple commit + return; + } + RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name); + boost::shared_ptr<protobuf::IUCommission> update = boost::shared_ptr<protobuf::IUCommission>(new protobuf::IUCommission()); + update->set_uid(_uid); + update->set_revision(_revision); + update->set_writer_name(_buffer->unique_name()); + boost::shared_ptr<int> result = server->call<int>("commit", update, 1); // TODO 1 sec + if (*result == 0) { + throw IUUpdateFailedError(); + } else { + _revision = *result; + } +} + +void RemotePushIU::_apply_link_update(IULinkUpdate::ptr update) +{ + _revision = update->revision; + if (update->is_delta) { + _add_and_remove_links(update->new_links, update->links_to_remove); + } else { + _replace_links(update->new_links); + } +} +void RemotePushIU::_apply_update(IUPayloadUpdate::ptr update) +{ + _revision = update->revision; + if (update->is_delta) { + for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) { + _payload._remotely_enforced_delitem(*it); + } + for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) { + _payload._remotely_enforced_setitem(it->first, it->second); + } + } else { + _payload._remotely_enforced_wipe(); + for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) { + _payload._remotely_enforced_setitem(it->first, it->second); + } + } +} +void RemotePushIU::_apply_commission() +{ + _committed = true; +} +void Payload::_remotely_enforced_wipe() +{ + _store.clear(); +} +void Payload::_remotely_enforced_delitem(const std::string& k) +{ + _store.erase(k); +} +void Payload::_remotely_enforced_setitem(const std::string& k, const std::string& v) +{ + _store[k] = v; +} + +//}}} + + + // PayloadEntryProxy//{{{ @@ -116,9 +968,6 @@ PayloadEntryProxy::operator double() // Payload//{{{ -Payload::Payload() -{ -} void Payload::initialize(boost::shared_ptr<IUInterface> iu) { _iu = iu; @@ -130,22 +979,32 @@ PayloadEntryProxy Payload::operator[](const std::string& key) return PayloadEntryProxy(this, key); } -inline void Payload::set(const std::string& k, const std::string& v) { - //self._iu._modify_payload(self, isdelta=true, newitm={k:v}, keystorm=[], writer_name=None ); +inline void Payload::_internal_set(const std::string& k, const std::string& v, const std::string& writer_name) { + std::map<std::string, std::string> _new; + std::vector<std::string> _remove; + _new[k]=v; + _iu->_modify_payload(true, _new, _remove, writer_name ); _store[k] = v; } -inline void Payload::remove(const std::string& k) { - //self._iu._modify_payload(self, isdelta=true, newitm={}, keystorm=[k], writer_name=None ); +inline void Payload::_internal_remove(const std::string& k, const std::string& writer_name) { + std::map<std::string, std::string> _new; + std::vector<std::string> _remove; + _remove.push_back(k); + _iu->_modify_payload(true, _new, _remove, writer_name ); _store.erase(k); } +void Payload::_internal_replace_all(const std::map<std::string, std::string>& new_contents, const std::string& writer_name) +{ + std::vector<std::string> _remove; + _iu->_modify_payload(false, new_contents, _remove, writer_name ); + _store = new_contents; +} inline std::string Payload::get(const std::string& k) { if (_store.count(k)>0) return _store[k]; else return IPAACA_PAYLOAD_DEFAULT_STRING_VALUE; } //}}} -/* - // IUConverter//{{{ IUConverter::IUConverter() @@ -161,17 +1020,26 @@ std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire) boost::shared_ptr<const IU> obj = boost::static_pointer_cast<const IU> (data.second); boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU()); // transfer obj data to pbo - pbo->set_uid(obj->uid); - pbo->set_revision(obj->revision); - pbo->set_writer_name(obj->writer_name); - pbo->set_is_delta(obj->is_delta); - for (std::map<std::string, std::string>::const_iterator it=obj->new_items.begin(); it!=obj->new_items.end(); ++it) { - protobuf::PayloadItem* item = pbo->add_new_items(); + pbo->set_uid(obj->uid()); + pbo->set_revision(obj->revision()); + pbo->set_category(obj->category()); + pbo->set_payload_type(obj->payload_type()); + pbo->set_owner_name(obj->owner_name()); + pbo->set_committed(obj->committed()); + pbo->set_access_mode(ipaaca::protobuf::IU::PUSH); // TODO + pbo->set_read_only(obj->read_only()); + for (std::map<std::string, std::string>::const_iterator it=obj->_payload._store.begin(); it!=obj->_payload._store.end(); ++it) { + protobuf::PayloadItem* item = pbo->add_payload(); item->set_key(it->first); item->set_value(it->second); + item->set_type("str"); // FIXME other types than str (later) } - for (std::vector<std::string>::const_iterator it=obj->keys_to_remove.begin(); it!=obj->keys_to_remove.end(); ++it) { - pbo->add_keys_to_remove(*it); + for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) { + protobuf::LinkSet* links = pbo->add_links(); + links->set_type(it->first); + for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) { + links->add_targets(*it2); + } } pbo->SerializeToString(&wire); return getWireSchema(); @@ -179,30 +1047,47 @@ std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire) } AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std::string& wire) { - assert(wireSchema == getWireSchema()); // "ipaaca-iu-payload-update" + assert(wireSchema == getWireSchema()); // "ipaaca-iu" boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU()); pbo->ParseFromString(wire); - boost::shared_ptr<IU> obj(new IU()); - // transfer pbo data to obj - obj->uid = pbo->uid(); - obj->revision = pbo->revision(); - obj->writer_name = pbo->writer_name(); - obj->is_delta = pbo->is_delta(); - for (int i=0; i<pbo->new_items_size(); i++) { - const protobuf::PayloadItem& it = pbo->new_items(i); - obj->new_items[it.key()] = it.value(); - } - for (int i=0; i<pbo->keys_to_remove_size(); i++) { - obj->keys_to_remove.push_back(pbo->keys_to_remove(i)); + IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode()); + switch(mode) { + case IU_ACCESS_PUSH: + { + // Create a "remote push IU" + boost::shared_ptr<RemotePushIU> obj = RemotePushIU::create(); + // transfer pbo data to obj + obj->_uid = pbo->uid(); + obj->_revision = pbo->revision(); + obj->_category = pbo->category(); + obj->_payload_type = pbo->payload_type(); + obj->_owner_name = pbo->owner_name(); + obj->_committed = pbo->committed(); + obj->_read_only = pbo->read_only(); + obj->_access_mode = IU_ACCESS_PUSH; + for (int i=0; i<pbo->payload_size(); i++) { + const protobuf::PayloadItem& it = pbo->payload(i); + obj->_payload._store[it.key()] = it.value(); + } + for (int i=0; i<pbo->links_size(); i++) { + const protobuf::LinkSet& pls = pbo->links(i); + LinkSet& ls = obj->_links._links[pls.type()]; + for (int j=0; j<pls.targets_size(); j++) { + ls.insert(pls.targets(j)); + } + } + //return std::make_pair(getDataType(), obj); + return std::make_pair("ipaaca::RemotePushIU", obj); + break; + } + default: + // other cases not handled yet! ( TODO ) + throw NotImplementedError(); } - return std::make_pair(getDataType(), obj); } //}}} -*/ - - // IUPayloadUpdateConverter//{{{ IUPayloadUpdateConverter::IUPayloadUpdateConverter() @@ -319,17 +1204,36 @@ AnnotatedData IULinkUpdateConverter::deserialize(const std::string& wireSchema, //}}} +// IntConverter//{{{ +IntConverter::IntConverter() +: Converter<std::string> ("int", "int32", true) +{ +} +std::string IntConverter::serialize(const AnnotatedData& data, std::string& wire) +{ + // Ensure that DATA actually holds a datum of the data-type we expect. + assert(data.first == getDataType()); // "int" + // NOTE: a dynamic_pointer_cast cannot be used from void* + boost::shared_ptr<const int> obj = boost::static_pointer_cast<const int> (data.second); + boost::shared_ptr<protobuf::IntMessage> pbo(new protobuf::IntMessage()); + // transfer obj data to pbo + pbo->set_value(*obj); + pbo->SerializeToString(&wire); + return getWireSchema(); +} +AnnotatedData IntConverter::deserialize(const std::string& wireSchema, const std::string& wire) { + assert(wireSchema == getWireSchema()); // "int" + boost::shared_ptr<protobuf::IntMessage> pbo(new protobuf::IntMessage()); + pbo->ParseFromString(wire); + boost::shared_ptr<int> obj = boost::shared_ptr<int>(new int(pbo->value())); + return std::make_pair("int", obj); +} - - - - - - +//}}} } // of namespace ipaaca diff --git a/cpp/src/ipaaca.h b/cpp/src/ipaaca.h index b585605eb45618e28bb7466ca8e424ded0b8a5c7..2c42c7ec9505eff197e9510b3e5775294fbb6d3a 100644 --- a/cpp/src/ipaaca.h +++ b/cpp/src/ipaaca.h @@ -1,16 +1,33 @@ #ifndef __IPAACA_H__ #define __IPAACA_H_ +/// ipaaca/IU/RSB protocol major version number +#define IPAACA_PROTOCOL_VERSION_MAJOR 1 +/// ipaaca/IU/RSB protocol minor version number +#define IPAACA_PROTOCOL_VERSION_MINOR 0 + +/// running release number of ipaaca-c++ +#define IPAACA_CPP_RELEASE_NUMBER 1 +/// date of last release number increment +#define IPAACA_CPP_RELEASE_DATE "2012-04-13" + + #ifdef IPAACA_DEBUG_MESSAGES #define IPAACA_INFO(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- " << i << std::endl; -#define IPAACA_IMPLEMENT_ME(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- IMPLEMENT ME" << std::endl; +#define IPAACA_WARNING(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- WARNING: " << i << std::endl; +#define IPAACA_IMPLEMENT_ME std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- IMPLEMENT ME" << std::endl; #define IPAACA_TODO(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- TODO: " << i << std::endl; #else #define IPAACA_INFO(i) ; +#define IPAACA_WARNING(i) ; #define IPAACA_IMPLEMENT_ME(i) ; #define IPAACA_TODO(i) ; #endif +/// marking pure virtual functions for extra readability +#define _IPAACA_ABSTRACT_ + +/// value to return when reading nonexistant payload keys #define IPAACA_PAYLOAD_DEFAULT_STRING_VALUE "" #include <iostream> @@ -30,94 +47,346 @@ #include <ipaaca.pb.h> +#include <pthread.h> +#include <uuid/uuid.h> + //using namespace boost; using namespace rsb; +using namespace rsb::filter; using namespace rsb::converter; +using namespace rsb::patterns; namespace ipaaca { -enum IUEventType { - ADDED, - COMMITTED, - DELETED, - RETRACTED, - UPDATED, - LINKSUPDATED -}; +typedef uint32_t revision_t; + +/// Type of the IU event. Realized as an integer to enable bit masks for filters. +typedef uint32_t IUEventType; +#define IU_ADDED 1 +#define IU_COMMITTED 2 +#define IU_DELETED 4 +#define IU_RETRACTED 8 +#define IU_UPDATED 16 +#define IU_LINKSUPDATED 32 +/// Bit mask for receiving all events +#define IU_ALL_EVENTS 63 +/// Convert an int event type to a human-readable string +inline std::string iu_event_type_to_str(IUEventType type) +{ + switch(type) { + case IU_ADDED: return "ADDED"; + case IU_COMMITTED: return "COMMITTED"; + case IU_DELETED: return "DELETED"; + case IU_RETRACTED: return "RETRACTED"; + case IU_UPDATED: return "UPDATED"; + case IU_LINKSUPDATED: return "LINKSUPDATED"; + default: return "(NOT A KNOWN SINGLE IU EVENT TYPE)"; + } +} + +/// IU access mode: PUSH means that updates are broadcast; REMOTE means that reads are RPC calls; MESSAGE means a fire-and-forget message enum IUAccessMode { - PUSH, - REMOTE, - MESSAGE + IU_ACCESS_PUSH, + IU_ACCESS_REMOTE, + IU_ACCESS_MESSAGE }; +class PayloadEntryProxy; class Payload; class IUInterface; class IU; class RemotePushIU; -// class IULinkUpdate -// class IULinkUpdateConverter; +class IULinkUpdate; +class IULinkUpdateConverter; +class IUPayloadUpdate; +class IUPayloadUpdateConverter; class IUStore; class FrozenIUStore; -class IUEventHandler; +class Buffer; +class InputBuffer; +class OutputBuffer; + +/// generate a UUID as an ASCII string +std::string generate_uuid_string(); -class Buffer { +/// store for (local) IUs. TODO Stores need to be unified more +class IUStore: public std::map<std::string, boost::shared_ptr<IU> > +{ +}; +/// store for RemotePushIUs. TODO Stores need to be unified more +class RemotePushIUStore: public std::map<std::string, boost::shared_ptr<RemotePushIU> > // TODO genericize to all remote IU types +{ }; -class InputBuffer: public Buffer { +/// a reentrant lock/mutex +class Lock +{ + protected: + pthread_mutexattr_t _attrs; + pthread_mutex_t _mutex; + public: + inline Lock() { + pthread_mutexattr_init(&_attrs); + pthread_mutexattr_settype(&_attrs, PTHREAD_MUTEX_RECURSIVE); + pthread_mutex_init(&_mutex, &_attrs); + } + inline ~Lock() { + pthread_mutex_destroy(&_mutex); + pthread_mutexattr_destroy(&_attrs); + } + inline void lock() { + pthread_mutex_lock(&_mutex); + } + inline void unlock() { + pthread_mutex_unlock(&_mutex); + } }; -class OutputBuffer: public Buffer { +typedef std::set<std::string> LinkSet; +typedef std::map<std::string, LinkSet> LinkMap; +class SmartLinkMap { + friend std::ostream& operator<<(std::ostream& os, const SmartLinkMap& obj); + friend class IUInterface; + friend class IU; + friend class IUConverter; + public: + const LinkSet& get_links(const std::string& key); + const LinkMap& get_all_links(); + + protected: + LinkMap _links; + static LinkSet empty_link_set; + void _add_and_remove_links(const LinkMap& add, const LinkMap& remove); + void _replace_links(const LinkMap& links); }; -/* -class IUEventFunctionHandler: public rsb::EventFunctionHandler { +const LinkSet EMPTY_LINK_SET; +//const std::set<std::string> EMPTY_LINK_SET; + +//typedef boost::function<void (const std::string&, bool, IUEventType, const std::string&)> IUEventHandlerFunction; +typedef boost::function<void (boost::shared_ptr<IUInterface>, IUEventType, bool)> IUEventHandlerFunction; + +class IUEventHandler { + protected: + IUEventHandlerFunction _function; + IUEventType _event_mask; + bool _for_all_categories; + std::set<std::string> _categories; + protected: + inline bool _condition_met(IUEventType event_type, const std::string& category) + { + return ((_event_mask&event_type)!=0) && (_for_all_categories || (_categories.count(category)>0)); + } + public: + IUEventHandler(IUEventHandlerFunction function, IUEventType event_mask, const std::string& category); + IUEventHandler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories); + //void call(Buffer* buffer, const std::string& uid, bool local, IUEventType event_type, const std::string& category); + void call(Buffer* buffer, boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category); + typedef boost::shared_ptr<IUEventHandler> ptr; +}; + +class Buffer { //: public boost::enable_shared_from_this<Buffer> {//{{{ + friend class IU; + friend class RemotePushIU; + friend class CallbackIUPayloadUpdate; + friend class CallbackIULinkUpdate; + friend class CallbackIUCommission; + protected: + std::string _uuid; + std::string _basename; + std::string _unique_name; + std::string _id_prefix; + std::vector<IUEventHandler::ptr> _event_handlers; + protected: + _IPAACA_ABSTRACT_ virtual void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef") = 0; + _IPAACA_ABSTRACT_ virtual void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef") = 0; + _IPAACA_ABSTRACT_ virtual void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name="undef") = 0; + void _allocate_unique_name(const std::string& basename, const std::string& function); + inline Buffer(const std::string& basename, const std::string& function) { + _allocate_unique_name(basename, function); + } + void call_iu_event_handlers(boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category); + public: + virtual inline ~Buffer() { } + inline const std::string& unique_name() { return _unique_name; } + void register_handler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories); + void register_handler(IUEventHandlerFunction function, IUEventType event_mask = IU_ALL_EVENTS, const std::string& category=""); + //_IPAACA_ABSTRACT_ virtual void add(boost::shared_ptr<IUInterface> iu) = 0; + _IPAACA_ABSTRACT_ virtual boost::shared_ptr<IUInterface> get(const std::string& iu_uid) = 0; + _IPAACA_ABSTRACT_ virtual std::set<boost::shared_ptr<IUInterface> > get_ius() = 0; +}; +//}}} + +class CallbackIUPayloadUpdate: public Server::Callback<IUPayloadUpdate, int> { protected: Buffer* _buffer; public: - inline IUEventFunctionHandler(Buffer* buffer, const EventFunction& function, const std::string& method="") - : EventFunctionHandler(function, method), _buffer(buffer) { } + CallbackIUPayloadUpdate(Buffer* buffer); + boost::shared_ptr<int> call(const std::string& methodName, boost::shared_ptr<IUPayloadUpdate> update); }; -*/ +class CallbackIULinkUpdate: public Server::Callback<IULinkUpdate, int> { + protected: + Buffer* _buffer; + public: + CallbackIULinkUpdate(Buffer* buffer); + public: + boost::shared_ptr<int> call(const std::string& methodName, boost::shared_ptr<IULinkUpdate> update); +}; +class CallbackIUCommission: public Server::Callback<protobuf::IUCommission, int> { + protected: + Buffer* _buffer; + public: + CallbackIUCommission(Buffer* buffer); + public: + boost::shared_ptr<int> call(const std::string& methodName, boost::shared_ptr<protobuf::IUCommission> update); +}; + +class OutputBuffer: public Buffer { //, public boost::enable_shared_from_this<OutputBuffer> {//{{{ + friend class IU; + friend class RemotePushIU; + protected: + protected: + std::map<std::string, Informer<AnyType>::Ptr> _informer_store; + IUStore _iu_store; + Lock _iu_id_counter_lock; + ServerPtr _server; + protected: + // informing functions + void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef"); + void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef"); + void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name); + // remote access functions + // _remote_update_links(IULinkUpdate) + // _remote_update_payload(IUPayloadUpdate) + // _remote_commit(protobuf::IUCommission) + protected: + void _publish_iu(boost::shared_ptr<IU> iu); + void _retract_iu(boost::shared_ptr<IU> iu); + Informer<AnyType>::Ptr _get_informer(const std::string& category); + protected: + OutputBuffer(const std::string& basename); + void _initialize_server(); + public: + static boost::shared_ptr<OutputBuffer> create(const std::string& basename); + ~OutputBuffer() { + IPAACA_IMPLEMENT_ME + } + void add(boost::shared_ptr<IU> iu); + boost::shared_ptr<IU> remove(const std::string& iu_uid); + boost::shared_ptr<IU> remove(boost::shared_ptr<IU> iu); + boost::shared_ptr<IUInterface> get(const std::string& iu_uid); + std::set<boost::shared_ptr<IUInterface> > get_ius(); + typedef boost::shared_ptr<OutputBuffer> ptr; +}; +//}}} + +class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<InputBuffer> {//{{{ + friend class IU; + friend class RemotePushIU; + protected: + std::map<std::string, ListenerPtr> _listener_store; + std::map<std::string, RemoteServerPtr> _remote_server_store; + RemotePushIUStore _iu_store; // TODO genericize + protected: + inline void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef") + { + IPAACA_WARNING("(ERROR) InputBuffer::_send_iu_link_update() should never be invoked") + } + inline void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef") + { + IPAACA_WARNING("(ERROR) InputBuffer::_send_iu_payload_update() should never be invoked") + } + inline void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name="undef") + { + IPAACA_WARNING("(ERROR) InputBuffer::_send_iu_commission() should never be invoked") + } + protected: + RemoteServerPtr _get_remote_server(const std::string& unique_server_name); + ListenerPtr _create_category_listener_if_needed(const std::string& category); + void _handle_iu_events(EventPtr event); + protected: + InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests); + InputBuffer(const std::string& basename, const std::string& category_interest1); + InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2); + InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3); + InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4); + public: + static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::vector<std::string>& category_interests); + static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1); + static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2); + static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3); + static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4); + ~InputBuffer() { + IPAACA_IMPLEMENT_ME + } + boost::shared_ptr<IUInterface> get(const std::string& iu_uid); + std::set<boost::shared_ptr<IUInterface> > get_ius(); + typedef boost::shared_ptr<InputBuffer> ptr; +}; +//}}} + +class IUConverter: public rsb::converter::Converter<std::string> {//{{{ + public: + IUConverter(); + std::string serialize(const rsb::AnnotatedData& data, std::string& wire); + rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire); +};//}}} -class IUPayloadUpdate { +class IUPayloadUpdate {//{{{ public: std::string uid; - uint32_t revision; + revision_t revision; std::string writer_name; bool is_delta; std::map<std::string, std::string> new_items; std::vector<std::string> keys_to_remove; friend std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj); -}; -class IUPayloadUpdateConverter: public rsb::converter::Converter<std::string> { + typedef boost::shared_ptr<IUPayloadUpdate> ptr; +};//}}} +class IUPayloadUpdateConverter: public rsb::converter::Converter<std::string> {//{{{ public: IUPayloadUpdateConverter(); std::string serialize(const rsb::AnnotatedData& data, std::string& wire); rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire); -}; +};//}}} -class IULinkUpdate { +class IULinkUpdate {//{{{ public: std::string uid; - uint32_t revision; + revision_t revision; std::string writer_name; bool is_delta; std::map<std::string, std::set<std::string> > new_links; std::map<std::string, std::set<std::string> > links_to_remove; friend std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj); -}; -class IULinkUpdateConverter: public rsb::converter::Converter<std::string> { + typedef boost::shared_ptr<IULinkUpdate> ptr; +};//}}} +class IULinkUpdateConverter: public rsb::converter::Converter<std::string> {//{{{ public: IULinkUpdateConverter(); std::string serialize(const rsb::AnnotatedData& data, std::string& wire); rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire); -}; +};//}}} -void initialize_ipaaca_rsb(); +class IntConverter: public rsb::converter::Converter<std::string> {//{{{ + public: + IntConverter(); + std::string serialize(const rsb::AnnotatedData& data, std::string& wire); + rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire); +};//}}} -class PayloadEntryProxy +class Initializer +{ + public: + static void initialize_ipaaca_rsb_if_needed(); + static bool initialized(); + protected: + static bool _initialized; +}; + +class PayloadEntryProxy//{{{ { protected: Payload* _payload; @@ -131,60 +400,256 @@ class PayloadEntryProxy inline std::string to_str() { return operator std::string(); } inline long to_int() { return operator long(); } inline double to_float() { return operator double(); } -}; +};//}}} -class Payload +class Payload//{{{ { + friend std::ostream& operator<<(std::ostream& os, const Payload& obj); + friend class IUInterface; + friend class IU; + friend class RemotePushIU; + friend class IUConverter; + friend class CallbackIUPayloadUpdate; protected: + std::string _owner_name; std::map<std::string, std::string> _store; boost::shared_ptr<IUInterface> _iu; protected: - friend class IU; - friend class RemotePushIU; - Payload(); void initialize(boost::shared_ptr<IUInterface> iu); + inline void _set_owner_name(const std::string& name) { _owner_name = name; } + void _remotely_enforced_wipe(); + void _remotely_enforced_delitem(const std::string& k); + void _remotely_enforced_setitem(const std::string& k, const std::string& v); + void _internal_replace_all(const std::map<std::string, std::string>& new_contents, const std::string& writer_name=""); + void _internal_set(const std::string& k, const std::string& v, const std::string& writer_name=""); + void _internal_remove(const std::string& k, const std::string& writer_name=""); public: + inline const std::string& owner_name() { return _owner_name; } + // access PayloadEntryProxy operator[](const std::string& key); - void set(const std::string& k, const std::string& v); - void remove(const std::string& k); + inline void set(const std::string& k, const std::string& v) { _internal_set(k, v); } + inline void remove(const std::string& k) { _internal_remove(k); } std::string get(const std::string& k); -}; + typedef boost::shared_ptr<Payload> ptr; +};//}}} -class IUInterface { +class IUInterface {//{{{ + friend class IUConverter; + friend std::ostream& operator<<(std::ostream& os, const IUInterface& obj); + protected: + IUInterface(); public: inline virtual ~IUInterface() { } -}; + protected: + std::string _uid; + revision_t _revision; + std::string _category; + std::string _payload_type; // default is "MAP" + std::string _owner_name; + bool _committed; + IUAccessMode _access_mode; + bool _read_only; + //boost::shared_ptr<Buffer> _buffer; + Buffer* _buffer; + SmartLinkMap _links; + protected: + friend class Payload; + // Internal functions that perform the update logic, + // e.g. sending a notification across the network + _IPAACA_ABSTRACT_ virtual void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) = 0; + _IPAACA_ABSTRACT_ virtual void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) = 0; + //void _set_buffer(boost::shared_ptr<Buffer> buffer); + void _associate_with_buffer(Buffer* buffer); + void _set_buffer(Buffer* buffer); + void _set_uid(const std::string& uid); + void _set_owner_name(const std::string& owner_name); + protected: + // internal functions that do not emit update events + inline void _add_and_remove_links(const LinkMap& add, const LinkMap& remove) { _links._add_and_remove_links(add, remove); } + inline void _replace_links(const LinkMap& links) { _links._replace_links(links); } + public: + inline bool is_published() { return (_buffer != 0); } + inline const std::string& uid() const { return _uid; } + inline revision_t revision() const { return _revision; } + inline const std::string& category() const { return _category; } + inline const std::string& payload_type() const { return _payload_type; } + inline const std::string& owner_name() const { return _owner_name; } + inline bool committed() const { return _committed; } + inline IUAccessMode access_mode() const { return _access_mode; } + inline bool read_only() const { return _read_only; } + //inline boost::shared_ptr<Buffer> buffer() { return _buffer; } + inline Buffer* buffer() const { return _buffer; } + inline const LinkSet& get_links(std::string type) { return _links.get_links(type); } + inline const LinkMap& get_all_links() { return _links.get_all_links(); } + // Payload + _IPAACA_ABSTRACT_ virtual Payload& payload() = 0; + _IPAACA_ABSTRACT_ virtual const Payload& const_payload() const = 0; + // setters + _IPAACA_ABSTRACT_ virtual void commit() = 0; + // functions to modify and update links: + void add_links(const std::string& type, const LinkSet& targets, const std::string& writer_name = ""); + void remove_links(const std::string& type, const LinkSet& targets, const std::string& writer_name = ""); + void modify_links(const LinkMap& add, const LinkMap& remove, const std::string& writer_name = ""); + void set_links(const LinkMap& links, const std::string& writer_name = ""); + // (with cpp specific convenience functions:) + void add_link(const std::string& type, const std::string& target, const std::string& writer_name = ""); + void remove_link(const std::string& type, const std::string& target, const std::string& writer_name = ""); + typedef boost::shared_ptr<IUInterface> ptr; +};//}}} -class IU: public IUInterface { +class IU: public IUInterface {//{{{ + friend class Buffer; + friend class InputBuffer; + friend class OutputBuffer; + friend class CallbackIUPayloadUpdate; + friend class CallbackIULinkUpdate; + friend class CallbackIUCommission; public: - Payload payload; + Payload _payload; protected: - inline IU() { } + Lock _revision_lock; + protected: + inline void _increase_revision_number() { _revision++; } + IU(const std::string& category, IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" ); public: - inline ~IU() { } - static boost::shared_ptr<IU> create(); - typedef boost::shared_ptr<IU> ref; -}; + inline ~IU() { + IPAACA_IMPLEMENT_ME + } + static boost::shared_ptr<IU> create(const std::string& category, IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" ); + inline Payload& payload() { return _payload; } + inline const Payload& const_payload() const { return _payload; } + void commit(); + protected: + void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = ""); + void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name = ""); + protected: + void _internal_commit(const std::string& writer_name = ""); + public: + typedef boost::shared_ptr<IU> ptr; +};//}}} -class RemotePushIU: public IUInterface { +class RemotePushIU: public IUInterface {//{{{ + friend class Buffer; + friend class InputBuffer; + friend class OutputBuffer; + friend class IUConverter; public: - inline ~RemotePushIU() { } -}; + Payload _payload; + protected: + RemotePushIU(); + static boost::shared_ptr<RemotePushIU> create(); + public: + inline ~RemotePushIU() { + IPAACA_IMPLEMENT_ME + } + inline Payload& payload() { return _payload; } + inline const Payload& const_payload() const { return _payload; } + void commit(); + protected: + void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = ""); + void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name = ""); + protected: + void _apply_update(IUPayloadUpdate::ptr update); + void _apply_link_update(IULinkUpdate::ptr update); + void _apply_commission(); + typedef boost::shared_ptr<RemotePushIU> ptr; +};//}}} -class IUPublishedError: public std::exception +class Exception: public std::exception//{{{ { protected: std::string _description; + inline Exception(const std::string& description=""): _description(description) { } + public: + inline ~Exception() throw() { } + const char* what() const throw() { + return _description.c_str(); + } +};//}}} +class IUNotFoundError: public Exception//{{{ +{ + public: + inline ~IUNotFoundError() throw() { } + inline IUNotFoundError() { //boost::shared_ptr<IU> iu) { + _description = "IUNotFoundError"; + } +};//}}} +class IUPublishedError: public Exception//{{{ +{ public: inline ~IUPublishedError() throw() { } inline IUPublishedError() { //boost::shared_ptr<IU> iu) { _description = "IUPublishedError"; } - const char* what() const throw() { - return _description.c_str(); +};//}}} +class IUCommittedError: public Exception//{{{ +{ + public: + inline ~IUCommittedError() throw() { } + inline IUCommittedError() { //boost::shared_ptr<IU> iu) { + _description = "IUCommittedError"; } -}; +};//}}} +class IUUpdateFailedError: public Exception//{{{ +{ + public: + inline ~IUUpdateFailedError() throw() { } + inline IUUpdateFailedError() { //boost::shared_ptr<IU> iu) { + _description = "IUUpdateFailedError"; + } +};//}}} +class IUReadOnlyError: public Exception//{{{ +{ + public: + inline ~IUReadOnlyError() throw() { } + inline IUReadOnlyError() { //boost::shared_ptr<IU> iu) { + _description = "IUReadOnlyError"; + } +};//}}} +class IUAlreadyInABufferError: public Exception//{{{ +{ + public: + inline ~IUAlreadyInABufferError() throw() { } + inline IUAlreadyInABufferError() { //boost::shared_ptr<IU> iu) { + _description = "IUAlreadyInABufferError"; + } +};//}}} +class IUAlreadyHasAnUIDError: public Exception//{{{ +{ + public: + inline ~IUAlreadyHasAnUIDError() throw() { } + inline IUAlreadyHasAnUIDError() { //boost::shared_ptr<IU> iu) { + _description = "IUAlreadyHasAnUIDError"; + } +};//}}} +class IUAlreadyHasAnOwnerNameError: public Exception//{{{ +{ + public: + inline ~IUAlreadyHasAnOwnerNameError() throw() { } + inline IUAlreadyHasAnOwnerNameError() { //boost::shared_ptr<IU> iu) { + _description = "IUAlreadyHasAnOwnerNameError"; + } +};//}}} +class NotImplementedError: public Exception//{{{ +{ + public: + inline ~NotImplementedError() throw() { } + inline NotImplementedError() { //boost::shared_ptr<IU> iu) { + _description = "NotImplementedError"; + } +};//}}} +// (snippets) //{{{ +/* +class IUEventFunctionHandler: public rsb::EventFunctionHandler { + protected: + Buffer* _buffer; + public: + inline IUEventFunctionHandler(Buffer* buffer, const EventFunction& function, const std::string& method="") + : EventFunctionHandler(function, method), _buffer(buffer) { } +}; +*/ +//}}} } // of namespace ipaaca diff --git a/cpp/test/src/.gitignore b/cpp/test/src/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..05c3e637fa52c0fb86ceb4bcc7022728da9210b5 --- /dev/null +++ b/cpp/test/src/.gitignore @@ -0,0 +1,3 @@ +ipaaca.pb.cc +ipaaca.pb.h + diff --git a/cpp/test/src/Makefile b/cpp/test/src/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..1619f9c8f58ce0903d5ef997ef2ce28af80d7cc7 --- /dev/null +++ b/cpp/test/src/Makefile @@ -0,0 +1,23 @@ +CONFIG = -DIPAACA_DEBUG_MESSAGES +IPAACASOURCES = ../../src/ipaaca.cc ipaaca.pb.cc +TEXTSOURCES = ${IPAACASOURCES} textsender.cc +CCFLAGS=-I. -I../../src -I/usr/local/include -I/opt/local/include ${CONFIG} +BOOSTLIBS = -L/opt/local/lib -lboost_regex-mt -lboost_date_time-mt -lboost_program_options-mt -lboost_thread-mt -lboost_filesystem-mt -lboost_signals-mt -lboost_system-mt +PROTOLIBS = -L/opt/local/lib -lprotobuf +LIBS = ${BOOSTLIBS} ${PROTOLIBS} -L/usr/local/lib -lrsc -lrsbcore + +COMPILER = gfilt + +all: protoc textsender + + +textsender: + ${COMPILER} ${CCFLAGS} -o textsender ${TEXTSOURCES} ${LIBS} + +protoc: + protoc --proto_path=../../../proto ../../../proto/ipaaca.proto --cpp_out=. + +clean: + rm -f textsender ipaaca.pb.h ipaaca.pb.cc + + diff --git a/cpp/test/src/textsender.cc b/cpp/test/src/textsender.cc new file mode 100644 index 0000000000000000000000000000000000000000..85be4c5cc11dc8198a3a31f9a01a36ca1916a8d7 --- /dev/null +++ b/cpp/test/src/textsender.cc @@ -0,0 +1,117 @@ +#include <ipaaca.h> +#include <typeinfo> + +using namespace ipaaca; + +const char RECV_CATEGORY[] = "WORD"; +const char SEND_CATEGORY[] = "TEXT"; + +class TextSender { + protected: + OutputBuffer::ptr _ob; + InputBuffer::ptr _ib; + public: + TextSender(); + void outbuffer_handle_iu_event(IUInterface::ptr iu, IUEventType event_type, bool local); + void inbuffer_handle_iu_event(IUInterface::ptr iu, IUEventType event_type, bool local); + IUInterface::ptr find_last_iu(); + void publish_text_to_print(const std::string& text, const std::string& parent_iu_uid=""); +}; + +TextSender::TextSender() { + _ob = OutputBuffer::create("TextSenderOut"); + _ob->register_handler(boost::bind(&TextSender::outbuffer_handle_iu_event, this, _1, _2, _3)); + _ib = InputBuffer::create("TextSenderIn", RECV_CATEGORY); + _ib->register_handler(boost::bind(&TextSender::inbuffer_handle_iu_event, this, _1, _2, _3)); +} + +void TextSender::outbuffer_handle_iu_event(IUInterface::ptr iu, IUEventType event_type, bool local) +{ + std::cout << "(own IU event " << iu_event_type_to_str(event_type) << " " << iu->uid() << ")" << std::endl; + if (event_type == IU_UPDATED) { + std::set<std::string> parent_uids = iu->get_links("GRIN"); + if (parent_uids.size() > 0) { + std::string parent_uid = *(parent_uids.begin()); + std::cout << "updating parent ..." << std::endl; + std::set<std::string> next_uids = iu->get_links("SUCCESSOR"); + if (next_uids.size() > 0) { + std::string next_uid = *(next_uids.begin()); + IUInterface::ptr next_iu = _ob->get(next_uid); + std::set<std::string> next_letter_grin_links = next_iu->get_links("GRIN"); + if (next_letter_grin_links.count(parent_uid) == 0) { + // next letter belongs to new word + IUInterface::ptr parent_iu = _ib->get(parent_uid); + parent_iu->payload()["STATE"] = "REALIZED"; + } else { + IUInterface::ptr parent_iu = _ib->get(parent_uid); + parent_iu->payload()["STATE"] = "STARTED"; + } + } else { + // there are no more letters, this is the end of the final word + IUInterface::ptr parent_iu = _ib->get(parent_uid); + parent_iu->payload()["STATE"] = "REALIZED"; + } + std::cout << " ... done." << std::endl; + } + } else { + } +} + +void TextSender::inbuffer_handle_iu_event(IUInterface::ptr iu, IUEventType event_type, bool local) +{ + if (event_type == IU_LINKSUPDATED) { + std::cout << "links updated" << std::endl; + } else if (event_type == IU_ADDED) { + std::string word = iu->payload()["WORD"]; + std::cout << "Received new word: " << word << std::endl; + publish_text_to_print(word, iu->uid()); + } else if (event_type == IU_RETRACTED) { + std::string retracted_uid = iu->uid(); + } else { + std::cout << "(IU event " << iu_event_type_to_str(event_type) << " " << iu->uid() << ")" << std::endl; + } +} + +IUInterface::ptr TextSender::find_last_iu() { + std::set<IUInterface::ptr> ius = _ob->get_ius(); + for (std::set<IUInterface::ptr>::iterator it = ius.begin(); it!=ius.end(); ++it) { + if ((*it)->get_links("SUCCESSOR").size() == 0) return *it; + } + return IUInterface::ptr(); +} + +void TextSender::publish_text_to_print(const std::string& text, const std::string& parent_iu_uid) { + IUInterface::ptr previous_iu = find_last_iu(); + if (previous_iu) { + // insert a blank if we already have words in the buffer + IU::ptr iu = IU::create( SEND_CATEGORY ); + iu->payload()["CONTENT"] = " "; + _ob->add(iu); + previous_iu->add_link( "SUCCESSOR", iu->uid() ); + iu->add_link( "PREDECESSOR", previous_iu->uid() ); + if (parent_iu_uid != "") iu->add_link( "GRIN", parent_iu_uid ); + previous_iu = iu; + } + for (int i=0; i<text.size(); ++i) { + IU::ptr iu = IU::create( SEND_CATEGORY ); + iu->payload()["CONTENT"] = std::string(1, text.at(i)); + _ob->add(iu); + if (previous_iu) { + previous_iu->add_link( "SUCCESSOR", iu->uid() ); + iu->add_link( "PREDECESSOR", previous_iu->uid() ); + if (parent_iu_uid != "") iu->add_link( "GRIN", parent_iu_uid ); + } + if (previous_iu) std::cout << "previous IU: " << *previous_iu << std::endl; + previous_iu = iu; + } +} + +int main() { + TextSender sender; + sleep(1); + sender.publish_text_to_print("(INIT)"); + std::cout << "Press Ctrl-C to cancel..." << std::endl; + while (true) sleep(1); +} + +