From 864311b2c4673096236bae7594d8cb903893a6dc Mon Sep 17 00:00:00 2001 From: Ramin Yaghoubzadeh <ryaghoub@techfak.uni-bielefeld.de> Date: Fri, 13 Apr 2012 15:19:37 +0200 Subject: [PATCH] [C++] nearing completion of initial C++ port --- cpp/src/Makefile | 9 +- cpp/src/ipaaca-test-main.cc | 13 +- cpp/src/ipaaca.cc | 237 ++++++++++++++++++++---------------- cpp/src/ipaaca.h | 21 +++- 4 files changed, 165 insertions(+), 115 deletions(-) diff --git a/cpp/src/Makefile b/cpp/src/Makefile index 00c2459..e0644e5 100644 --- a/cpp/src/Makefile +++ b/cpp/src/Makefile @@ -1,5 +1,7 @@ CONFIG = -DIPAACA_DEBUG_MESSAGES -SOURCES = ipaaca.cc ipaaca.pb.cc ipaaca-test-main.cc +IPAACASOURCES = ipaaca.cc ipaaca.pb.cc +SOURCES = ${IPAACASOURCES} ipaaca-test-main.cc +TEXTSOURCES = ${IPAACASOURCES} textsender.cc CCFLAGS=-I. -I/usr/local/include -I/opt/local/include ${CONFIG} BOOSTLIBS = -L/opt/local/lib -lboost_regex-mt -lboost_date_time-mt -lboost_program_options-mt -lboost_thread-mt -lboost_filesystem-mt -lboost_signals-mt -lboost_system-mt PROTOLIBS = -L/opt/local/lib -lprotobuf @@ -16,6 +18,9 @@ receiver: sender: ${COMPILER} ${CCFLAGS} -DMAKE_SENDER -o ipaaca-sender ${SOURCES} ${LIBS} +textsender: + ${COMPILER} ${CCFLAGS} -o textsender ${TEXTSOURCES} ${LIBS} + main: ${COMPILER} ${CCFLAGS} -o ipaaca-main ${SOURCES} ${LIBS} @@ -23,6 +28,6 @@ protoc: protoc --proto_path=../../proto ../../proto/ipaaca.proto --cpp_out=. clean: - rm -f ipaaca-main ipaaca-sender ipaaca-receiver ipaaca.pb.h ipaaca.pb.cc + rm -f ipaaca-main ipaaca-sender ipaaca-receiver textsender ipaaca.pb.h ipaaca.pb.cc diff --git a/cpp/src/ipaaca-test-main.cc b/cpp/src/ipaaca-test-main.cc index d2c38bb..edf8764 100644 --- a/cpp/src/ipaaca-test-main.cc +++ b/cpp/src/ipaaca-test-main.cc @@ -14,7 +14,7 @@ using namespace ipaaca; #ifdef MAKE_RECEIVER void my_first_iu_handler(IUInterface::ptr iu, IUEventType type, bool local) { - std::cout << "[32m" << iu_event_type_to_str(type) << "[m" << std::endl; + std::cout << "[32m" << iu_event_type_to_str(type) << " " << (local?"(of local IU)":"(of remote IU)") << "[m" << std::endl; if (type == IU_LINKSUPDATED) { std::cout << " setting something in the remote payload" << std::endl; iu->payload()["new_field"] = "remotely_set"; @@ -22,7 +22,7 @@ void my_first_iu_handler(IUInterface::ptr iu, IUEventType type, bool local) } int main() { try{ - initialize_ipaaca_rsb(); + //initialize_ipaaca_rsb(); InputBuffer::ptr ib = InputBuffer::create("Tester", "testcategory"); ib->register_handler(my_first_iu_handler); @@ -37,13 +37,18 @@ int main() { } #else #ifdef MAKE_SENDER +void iu_handler_for_remote_changes(IUInterface::ptr iu, IUEventType type, bool local) +{ + std::cout << "[32m" << iu_event_type_to_str(type) << " " << (local?"(of local IU)":"(of remote IU)") << "[m" << std::endl; +} int main() { try{ - initialize_ipaaca_rsb(); + //initialize_ipaaca_rsb(); OutputBuffer::ptr ob = OutputBuffer::create("Tester"); - std::cout << "Buffer: " << ob->unique_name() << std::endl; + ob->register_handler(iu_handler_for_remote_changes); + //std::cout << "Buffer: " << ob->unique_name() << std::endl; IU::ptr iu = IU::create("testcategory"); ob->add(iu); diff --git a/cpp/src/ipaaca.cc b/cpp/src/ipaaca.cc index 71765bd..533e12c 100644 --- a/cpp/src/ipaaca.cc +++ b/cpp/src/ipaaca.cc @@ -3,21 +3,17 @@ namespace ipaaca { + // util and init//{{{ -std::string generate_uuid_string() -{ - uuid_t uuidt; - uuid_string_t uuidstr; - uuid_generate(uuidt); - uuid_unparse_lower(uuidt, uuidstr); - return uuidstr; -} + +bool Initializer::_initialized = false; //const LinkSet EMPTY_LINK_SET = LinkSet(); //const std::set<std::string> EMPTY_LINK_SET(); - -void initialize_ipaaca_rsb() +bool Initializer::initialized() { return _initialized; } +void Initializer::initialize_ipaaca_rsb_if_needed() { + if (_initialized) return; ParticipantConfig config = ParticipantConfig::fromConfiguration(); Factory::getInstance().setDefaultParticipantConfig(config); @@ -33,8 +29,22 @@ void initialize_ipaaca_rsb() boost::shared_ptr<ProtocolBufferConverter<protobuf::IUCommission> > iu_commission_converter(new ProtocolBufferConverter<protobuf::IUCommission> ()); stringConverterRepository()->registerConverter(iu_commission_converter); + boost::shared_ptr<IntConverter> int_converter(new IntConverter()); + stringConverterRepository()->registerConverter(int_converter); + + _initialized = true; //IPAACA_TODO("initialize all converters") } + +std::string generate_uuid_string() +{ + uuid_t uuidt; + uuid_string_t uuidstr; + uuid_generate(uuidt); + uuid_unparse_lower(uuidt, uuidstr); + return uuidstr; +} + /* void init_inprocess_too() { //ParticipantConfig config = Factory::getInstance().getDefaultParticipantConfig(); @@ -146,6 +156,8 @@ std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj)//{{{ //}}} // SmartLinkMap//{{{ + +LinkSet SmartLinkMap::empty_link_set; void SmartLinkMap::_add_and_remove_links(const LinkMap& add, const LinkMap& remove) { // remove specified links @@ -174,6 +186,16 @@ void SmartLinkMap::_replace_links(const LinkMap& links) //_links.clear(); _links=links; } +const LinkSet& SmartLinkMap::get_links(const std::string& key) +{ + LinkMap::const_iterator it = _links.find(key); + if (it==_links.end()) return empty_link_set; + return it->second; +} +const LinkMap& SmartLinkMap::get_all_links() +{ + return _links; +} //}}} // IUEventHandler//{{{ @@ -256,13 +278,13 @@ boost::shared_ptr<int> CallbackIUPayloadUpdate::call(const std::string& methodNa } if (update->is_delta) { for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) { - iu->payload()._internal_remove(*it, _buffer->unique_name()); + iu->payload()._internal_remove(*it, update->writer_name); //_buffer->unique_name()); } for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) { - iu->payload()._internal_set(it->first, it->second, _buffer->unique_name()); + iu->payload()._internal_set(it->first, it->second, update->writer_name); //_buffer->unique_name()); } } else { - iu->payload()._internal_replace_all(update->new_items, _buffer->unique_name()); + iu->payload()._internal_replace_all(update->new_items, update->writer_name); //_buffer->unique_name()); } _buffer->call_iu_event_handlers(iu, true, IU_UPDATED, iu->category()); revision_t revision = iu->revision(); @@ -272,49 +294,51 @@ boost::shared_ptr<int> CallbackIUPayloadUpdate::call(const std::string& methodNa boost::shared_ptr<int> CallbackIULinkUpdate::call(const std::string& methodName, boost::shared_ptr<IULinkUpdate> update) { - IPAACA_IMPLEMENT_ME - return boost::shared_ptr<int>(new int(0)); - /* - '''Apply a remotely requested update to one of the stored IU's links.''' - if update.uid not in self._iu_store: - logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid)) - return 0 - iu = self._iu_store[update.uid] - with iu.revision_lock: - if not OMIT_REVISION_CHECKS and (update.revision != 0) and (update.revision != iu.revision): - # (0 means "do not pay attention to the revision number" -> "force update") - logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid)) - return 0 - if update.is_delta: - iu.modify_links(add=update.new_links, remove=update.links_to_remove, writer_name=update.writer_name) - else: - iu.set_links(links=update.new_links, writer_name=update.writer_name) - self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.LINKSUPDATED, category=iu.category) - return iu.revision - */ + IUInterface::ptr iui = _buffer->get(update->uid); + if (! iui) { + IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid) + return boost::shared_ptr<int>(new int(0)); + } + IU::ptr iu = boost::static_pointer_cast<IU>(iui); + iu->_revision_lock.lock(); + if ((update->revision != 0) && (update->revision != iu->_revision)) { + IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid) + iu->_revision_lock.unlock(); + return boost::shared_ptr<int>(new int(0)); + } + if (update->is_delta) { + iu->modify_links(update->new_links, update->links_to_remove, update->writer_name); + } else { + iu->set_links(update->new_links, update->writer_name); + } + _buffer->call_iu_event_handlers(iu, true, IU_LINKSUPDATED, iu->category()); + revision_t revision = iu->revision(); + iu->_revision_lock.unlock(); + return boost::shared_ptr<int>(new int(revision)); } boost::shared_ptr<int> CallbackIUCommission::call(const std::string& methodName, boost::shared_ptr<protobuf::IUCommission> update) { - IPAACA_IMPLEMENT_ME - return boost::shared_ptr<int>(new int(0)); - /* - '''Apply a remotely requested commit to one of the stored IUs.''' - if iu_commission.uid not in self._iu_store: - logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid)) - return 0 - iu = self._iu_store[iu_commission.uid] - with iu.revision_lock: - if not OMIT_REVISION_CHECKS and (iu_commission.revision != 0) and (iu_commission.revision != iu.revision): - # (0 means "do not pay attention to the revision number" -> "force update") - logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid)) - return 0 - if iu.committed: - return 0 - else: - iu._internal_commit(writer_name=iu_commission.writer_name) - self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=IUEventType.COMMITTED, category=iu.category) - return iu.revision - */ + IUInterface::ptr iui = _buffer->get(update->uid()); + if (! iui) { + IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid()) + return boost::shared_ptr<int>(new int(0)); + } + IU::ptr iu = boost::static_pointer_cast<IU>(iui); + iu->_revision_lock.lock(); + if ((update->revision() != 0) && (update->revision() != iu->_revision)) { + IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid()) + iu->_revision_lock.unlock(); + return boost::shared_ptr<int>(new int(0)); + } + if (iu->committed()) { + return boost::shared_ptr<int>(new int(0)); + } else { + } + iu->_internal_commit(update->writer_name()); + _buffer->call_iu_event_handlers(iu, true, IU_LINKSUPDATED, iu->category()); + revision_t revision = iu->revision(); + iu->_revision_lock.unlock(); + return boost::shared_ptr<int>(new int(revision)); } //}}} @@ -325,6 +349,7 @@ OutputBuffer::OutputBuffer(const std::string& basename) :Buffer(basename, "OB") { _id_prefix = _basename + "-" + _uuid + "-IU-"; + _initialize_server(); } void OutputBuffer::_initialize_server() { @@ -335,6 +360,7 @@ void OutputBuffer::_initialize_server() } OutputBuffer::ptr OutputBuffer::create(const std::string& basename) { + Initializer::initialize_ipaaca_rsb_if_needed(); return OutputBuffer::ptr(new OutputBuffer(basename)); } IUInterface::ptr OutputBuffer::get(const std::string& iu_uid) @@ -343,6 +369,12 @@ IUInterface::ptr OutputBuffer::get(const std::string& iu_uid) if (it==_iu_store.end()) return IUInterface::ptr(); return it->second; } +std::set<IUInterface::ptr> OutputBuffer::get_ius() +{ + std::set<IUInterface::ptr> set; + for (IUStore::iterator it=_iu_store.begin(); it!=_iu_store.end(); ++it) set.insert(it->second); + return set; +} void OutputBuffer::_send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) { @@ -425,61 +457,6 @@ boost::shared_ptr<IU> OutputBuffer::remove(IU::ptr iu) IPAACA_IMPLEMENT_ME } -/* - def _send_iu_link_update(self, iu, is_delta, revision, new_links=None, links_to_remove=None, writer_name="undef"): - '''Send an IU link update. - - Keyword arguments: - iu -- the IU being updated - is_delta -- whether this is an incremental update or a replacement - the whole link dictionary - revision -- the new revision number - new_links -- a dictionary of new link sets - links_to_remove -- a dict of the link sets that shall be removed - writer_name -- name of the Buffer that initiated this update, necessary - to enable remote components to filter out updates that originate d - from their own operations - ''' - if new_links is None: - new_links = {} - if links_to_remove is None: - links_to_remove = {} - link_update = IULinkUpdate(iu._uid, is_delta=is_delta, revision=revision) - link_update.new_links = new_links - if is_delta: - link_update.links_to_remove = links_to_remove - link_update.writer_name = writer_name - informer = self._get_informer(iu._category) - informer.publishData(link_update) - # FIXME send the notification to the target, if the target is not the writer_name -*/ -/* - def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"): - '''Send an IU payload update. - - Keyword arguments: - iu -- the IU being updated - is_delta -- whether this is an incremental update or a replacement - revision -- the new revision number - new_items -- a dictionary of new payload items - keys_to_remove -- a list of the keys that shall be removed from the - payload - writer_name -- name of the Buffer that initiated this update, necessary - to enable remote components to filter out updates that originate d - from their own operations - ''' - if new_items is None: - new_items = {} - if keys_to_remove is None: - keys_to_remove = [] - payload_update = IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision) - payload_update.new_items = new_items - if is_delta: - payload_update.keys_to_remove = keys_to_remove - payload_update.writer_name = writer_name - informer = self._get_informer(iu._category) - informer.publishData(payload_update) -*/ //}}} // InputBuffer//{{{ @@ -520,22 +497,27 @@ InputBuffer::InputBuffer(const std::string& basename, const std::string& categor InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::vector<std::string>& category_interests) { + Initializer::initialize_ipaaca_rsb_if_needed(); return InputBuffer::ptr(new InputBuffer(basename, category_interests)); } InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1) { + Initializer::initialize_ipaaca_rsb_if_needed(); return InputBuffer::ptr(new InputBuffer(basename, category_interest1)); } InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2) { + Initializer::initialize_ipaaca_rsb_if_needed(); return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2)); } InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3) { + Initializer::initialize_ipaaca_rsb_if_needed(); return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3)); } InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4) { + Initializer::initialize_ipaaca_rsb_if_needed(); return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3, category_interest4)); } @@ -545,6 +527,12 @@ IUInterface::ptr InputBuffer::get(const std::string& iu_uid) if (it==_iu_store.end()) return IUInterface::ptr(); return it->second; } +std::set<IUInterface::ptr> InputBuffer::get_ius() +{ + std::set<IUInterface::ptr> set; + for (RemotePushIUStore::iterator it=_iu_store.begin(); it!=_iu_store.end(); ++it) set.insert(it->second); // TODO genericize + return set; +} RemoteServerPtr InputBuffer::_get_remote_server(const std::string& unique_server_name) @@ -601,6 +589,7 @@ void InputBuffer::_handle_iu_events(EventPtr event) RemotePushIUStore::iterator it; if (type == "ipaaca::IUPayloadUpdate") { boost::shared_ptr<IUPayloadUpdate> update = boost::static_pointer_cast<IUPayloadUpdate>(event->getData()); + IPAACA_INFO("** writer name: " << update->writer_name) if (update->writer_name == _unique_name) { //IPAACA_INFO("Ignoring locally-written IU update") return; @@ -1239,6 +1228,38 @@ AnnotatedData IULinkUpdateConverter::deserialize(const std::string& wireSchema, //}}} +// IntConverter//{{{ + +IntConverter::IntConverter() +: Converter<std::string> ("int", "int", true) +{ +} + +std::string IntConverter::serialize(const AnnotatedData& data, std::string& wire) +{ + IPAACA_INFO("entering") + // Ensure that DATA actually holds a datum of the data-type we expect. + assert(data.first == getDataType()); // "int" + // NOTE: a dynamic_pointer_cast cannot be used from void* + boost::shared_ptr<const int> obj = boost::static_pointer_cast<const int> (data.second); + boost::shared_ptr<protobuf::IntMessage> pbo(new protobuf::IntMessage()); + // transfer obj data to pbo + pbo->set_value(*obj); + pbo->SerializeToString(&wire); + IPAACA_INFO("leaving") + return getWireSchema(); + +} + +AnnotatedData IntConverter::deserialize(const std::string& wireSchema, const std::string& wire) { + assert(wireSchema == getWireSchema()); // "int" + boost::shared_ptr<protobuf::IntMessage> pbo(new protobuf::IntMessage()); + pbo->ParseFromString(wire); + boost::shared_ptr<int> obj = boost::shared_ptr<int>(new int(pbo->value())); + return std::make_pair("int", obj); +} + +//}}} } // of namespace ipaaca diff --git a/cpp/src/ipaaca.h b/cpp/src/ipaaca.h index 360cc5e..ed7a318 100644 --- a/cpp/src/ipaaca.h +++ b/cpp/src/ipaaca.h @@ -156,6 +156,7 @@ class SmartLinkMap { protected: LinkMap _links; + static LinkSet empty_link_set; void _add_and_remove_links(const LinkMap& add, const LinkMap& remove); void _replace_links(const LinkMap& links); }; @@ -213,6 +214,7 @@ class Buffer { //: public boost::enable_shared_from_this<Buffer> {//{{{ void register_handler(IUEventHandlerFunction function, IUEventType event_mask = IU_ALL_EVENTS, const std::string& category=""); //_IPAACA_ABSTRACT_ virtual void add(boost::shared_ptr<IUInterface> iu) = 0; _IPAACA_ABSTRACT_ virtual boost::shared_ptr<IUInterface> get(const std::string& iu_uid) = 0; + _IPAACA_ABSTRACT_ virtual std::set<boost::shared_ptr<IUInterface> > get_ius() = 0; }; //}}} @@ -274,6 +276,7 @@ class OutputBuffer: public Buffer { //, public boost::enable_shared_from_this<Ou boost::shared_ptr<IU> remove(const std::string& iu_uid); boost::shared_ptr<IU> remove(boost::shared_ptr<IU> iu); boost::shared_ptr<IUInterface> get(const std::string& iu_uid); + std::set<boost::shared_ptr<IUInterface> > get_ius(); typedef boost::shared_ptr<OutputBuffer> ptr; }; //}}} @@ -318,6 +321,7 @@ class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<Inp IPAACA_IMPLEMENT_ME } boost::shared_ptr<IUInterface> get(const std::string& iu_uid); + std::set<boost::shared_ptr<IUInterface> > get_ius(); //inline void add(boost::shared_ptr<IU> iu) //{ // IPAACA_IMPLEMENT_ME @@ -369,7 +373,22 @@ class IULinkUpdateConverter: public rsb::converter::Converter<std::string> {//{{ rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire); };//}}} -void initialize_ipaaca_rsb(); + +class IntConverter: public rsb::converter::Converter<std::string> {//{{{ + public: + IntConverter(); + std::string serialize(const rsb::AnnotatedData& data, std::string& wire); + rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire); +};//}}} + +class Initializer +{ + public: + static void initialize_ipaaca_rsb_if_needed(); + static bool initialized(); + protected: + static bool _initialized; +}; class PayloadEntryProxy//{{{ { -- GitLab