From 0eaf05b0bc6d44838d1889763f298761257b6a44 Mon Sep 17 00:00:00 2001 From: Ramin Yaghoubzadeh <ryaghoub@techfak.uni-bielefeld.de> Date: Sat, 14 Apr 2012 01:18:24 +0200 Subject: [PATCH] [C++] Almost fully working. Cross-tested in a triple setup with Python and Java versions. --- cpp/src/Makefile | 17 ++++-- cpp/src/ipaaca-test-main.cc | 3 + cpp/src/ipaaca.cc | 80 +++++++++--------------- cpp/src/ipaaca.h | 60 +++++++++--------- cpp/test/src/.gitignore | 3 + cpp/test/src/Makefile | 23 +++++++ cpp/test/src/textsender.cc | 117 ++++++++++++++++++++++++++++++++++++ 7 files changed, 217 insertions(+), 86 deletions(-) create mode 100644 cpp/test/src/.gitignore create mode 100644 cpp/test/src/Makefile create mode 100644 cpp/test/src/textsender.cc diff --git a/cpp/src/Makefile b/cpp/src/Makefile index e0644e5..45ba159 100644 --- a/cpp/src/Makefile +++ b/cpp/src/Makefile @@ -1,26 +1,33 @@ +ifeq ($(WBS_ARCH),mac) + LIB_SUFFIX=.dylib +else + LIB_SUFFIX=.so +endif + CONFIG = -DIPAACA_DEBUG_MESSAGES IPAACASOURCES = ipaaca.cc ipaaca.pb.cc SOURCES = ${IPAACASOURCES} ipaaca-test-main.cc TEXTSOURCES = ${IPAACASOURCES} textsender.cc CCFLAGS=-I. -I/usr/local/include -I/opt/local/include ${CONFIG} +LIBFLAGS=-fPIC -shared BOOSTLIBS = -L/opt/local/lib -lboost_regex-mt -lboost_date_time-mt -lboost_program_options-mt -lboost_thread-mt -lboost_filesystem-mt -lboost_signals-mt -lboost_system-mt PROTOLIBS = -L/opt/local/lib -lprotobuf LIBS = ${BOOSTLIBS} ${PROTOLIBS} -L/usr/local/lib -lrsc -lrsbcore COMPILER = gfilt -all: receiver sender +all: lib +lib: + ${COMPILER} ${CCFLAGS} ${IPAACASOURCES} ${LIBS} ${LIBFLAGS} -o libipaaca${LIB_SUFFIX} + receiver: ${COMPILER} ${CCFLAGS} -DMAKE_RECEIVER -o ipaaca-receiver ${SOURCES} ${LIBS} sender: ${COMPILER} ${CCFLAGS} -DMAKE_SENDER -o ipaaca-sender ${SOURCES} ${LIBS} -textsender: - ${COMPILER} ${CCFLAGS} -o textsender ${TEXTSOURCES} ${LIBS} - main: ${COMPILER} ${CCFLAGS} -o ipaaca-main ${SOURCES} ${LIBS} @@ -28,6 +35,6 @@ protoc: protoc --proto_path=../../proto ../../proto/ipaaca.proto --cpp_out=. clean: - rm -f ipaaca-main ipaaca-sender ipaaca-receiver textsender ipaaca.pb.h ipaaca.pb.cc + rm -f libipaaca${LIB_SUFFIX} ipaaca-main ipaaca-sender ipaaca-receiver ipaaca.pb.h ipaaca.pb.cc diff --git a/cpp/src/ipaaca-test-main.cc b/cpp/src/ipaaca-test-main.cc index edf8764..cefa166 100644 --- a/cpp/src/ipaaca-test-main.cc +++ b/cpp/src/ipaaca-test-main.cc @@ -53,6 +53,9 @@ int main() { IU::ptr iu = IU::create("testcategory"); ob->add(iu); + std::cout << "Updating in 1 sec" << std::endl; + sleep(1); + std::cout << "_payload.get(\"TEST\") = \"" << iu->_payload.get("TEST") << "\"" << std::endl; std::cout << "_payload[\"TEST\"] = \"" << (std::string) iu->_payload["TEST"] << "\"" << std::endl; iu->_payload["TEST"] = "123.5-WAS-SET"; diff --git a/cpp/src/ipaaca.cc b/cpp/src/ipaaca.cc index 533e12c..8bf6874 100644 --- a/cpp/src/ipaaca.cc +++ b/cpp/src/ipaaca.cc @@ -247,13 +247,10 @@ void Buffer::register_handler(IUEventHandlerFunction function, IUEventType event } void Buffer::call_iu_event_handlers(boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category) { - IPAACA_INFO("handling an event " << ipaaca::iu_event_type_to_str(event_type) << " for IU " << iu->uid()) - //IUInterface::ptr iu = buffer->get(uid); - //if (iu) { - for (std::vector<IUEventHandler::ptr>::iterator it = _event_handlers.begin(); it != _event_handlers.end(); ++it) { - (*it)->call(this, iu, local, event_type, category); - } - //} + //IPAACA_INFO("handling an event " << ipaaca::iu_event_type_to_str(event_type) << " for IU " << iu->uid()) + for (std::vector<IUEventHandler::ptr>::iterator it = _event_handlers.begin(); it != _event_handlers.end(); ++it) { + (*it)->call(this, iu, local, event_type, category); + } } //}}} @@ -441,7 +438,7 @@ Informer<AnyType>::Ptr OutputBuffer::_get_informer(const std::string& category) if (_informer_store.count(category) > 0) { return _informer_store[category]; } else { - IPAACA_INFO("making new informer for category " << category) + //IPAACA_INFO("Making new informer for category " << category) std::string scope_string = "/ipaaca/category/" + category; Informer<AnyType>::Ptr informer = Factory::getInstance().createInformer<AnyType> ( Scope(scope_string)); _informer_store[category] = informer; @@ -450,13 +447,28 @@ Informer<AnyType>::Ptr OutputBuffer::_get_informer(const std::string& category) } boost::shared_ptr<IU> OutputBuffer::remove(const std::string& iu_uid) { - IPAACA_IMPLEMENT_ME + IUStore::iterator it = _iu_store.find(iu_uid); + if (it == _iu_store.end()) throw IUNotFoundError(); + IU::ptr iu = it->second; + _retract_iu(iu); + _iu_store.erase(iu_uid); + return iu; } boost::shared_ptr<IU> OutputBuffer::remove(IU::ptr iu) { - IPAACA_IMPLEMENT_ME + return remove(iu->uid()); // to make sure it is in the store } +void OutputBuffer::_retract_iu(IU::ptr iu) +{ + Informer<protobuf::IURetraction>::DataPtr data(new protobuf::IURetraction()); + data->set_uid(iu->uid()); + data->set_revision(iu->revision()); + Informer<AnyType>::Ptr informer = _get_informer(iu->category()); + informer->publish(data); +} + + //}}} // InputBuffer//{{{ @@ -546,10 +558,9 @@ RemoteServerPtr InputBuffer::_get_remote_server(const std::string& unique_server ListenerPtr InputBuffer::_create_category_listener_if_needed(const std::string& category) { - IPAACA_INFO("entering") std::map<std::string, ListenerPtr>::iterator it = _listener_store.find(category); if (it!=_listener_store.end()) return it->second; - IPAACA_INFO("creating a new listener") + //IPAACA_INFO("Creating a new listener for category " << category) std::string scope_string = "/ipaaca/category/" + category; ListenerPtr listener = Factory::getInstance().createListener( Scope(scope_string) ); HandlerPtr event_handler = HandlerPtr( @@ -559,7 +570,6 @@ ListenerPtr InputBuffer::_create_category_listener_if_needed(const std::string& ); listener->addHandler(event_handler); _listener_store[category] = listener; - IPAACA_INFO("done") return listener; /* '''Return (or create, store and return) a category listener.''' @@ -584,14 +594,13 @@ void InputBuffer::_handle_iu_events(EventPtr event) iu->_set_buffer(this); call_iu_event_handlers(iu, false, IU_ADDED, iu->category() ); } - IPAACA_INFO( "New RemotePushIU state: " << (*iu) ) + //IPAACA_INFO( "New RemotePushIU state: " << (*iu) ) } else { RemotePushIUStore::iterator it; if (type == "ipaaca::IUPayloadUpdate") { boost::shared_ptr<IUPayloadUpdate> update = boost::static_pointer_cast<IUPayloadUpdate>(event->getData()); - IPAACA_INFO("** writer name: " << update->writer_name) + //IPAACA_INFO("** writer name: " << update->writer_name) if (update->writer_name == _unique_name) { - //IPAACA_INFO("Ignoring locally-written IU update") return; } it = _iu_store.find(update->uid); @@ -607,7 +616,6 @@ void InputBuffer::_handle_iu_events(EventPtr event) } else if (type == "ipaaca::IULinkUpdate") { boost::shared_ptr<IULinkUpdate> update = boost::static_pointer_cast<IULinkUpdate>(event->getData()); if (update->writer_name == _unique_name) { - //IPAACA_INFO("Ignoring locally-written IU update") return; } it = _iu_store.find(update->uid); @@ -623,7 +631,6 @@ void InputBuffer::_handle_iu_events(EventPtr event) } else if (type == "ipaaca::protobuf::IUCommission") { boost::shared_ptr<protobuf::IUCommission> update = boost::static_pointer_cast<protobuf::IUCommission>(event->getData()); if (update->writer_name() == _unique_name) { - //IPAACA_INFO("Ignoring locally-written IU commit") return; } it = _iu_store.find(update->uid()); @@ -641,37 +648,8 @@ void InputBuffer::_handle_iu_events(EventPtr event) std::cout << "(Unhandled Event type " << type << " !)" << std::endl; return; } - IPAACA_INFO( "New RemotePushIU state: " << *(it->second) ) + //IPAACA_INFO( "New RemotePushIU state: " << *(it->second) ) } - /* - else: - # an update to an existing IU - if event.data.writer_name == self.unique_name: - # Discard updates that originate from this buffer - return - if event.data.uid not in self._iu_store: - # TODO: we should request the IU's owner to send us the IU - logger.warning("Update message for IU which we did not fully receive before.") - return - if type_ is ipaaca_pb2.IUCommission: - # IU commit - iu = self._iu_store[event.data.uid] - iu._apply_commission() - iu._revision = event.data.revision - self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category) - elif type_ is IUPayloadUpdate: - # IU payload update - iu = self._iu_store[event.data.uid] - iu._apply_update(event.data) - self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category) - elif type_ is IULinkUpdate: - # IU link update - iu = self._iu_store[event.data.uid] - iu._apply_link_update(event.data) - self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.LINKSUPDATED, category=iu.category) - else: - logger.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_)) - */ } //}}} @@ -1036,7 +1014,6 @@ IUConverter::IUConverter() std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire) { - IPAACA_INFO("entering") // Ensure that DATA actually holds a datum of the data-type we expect. assert(data.first == getDataType()); // "ipaaca::IU" // NOTE: a dynamic_pointer_cast cannot be used from void* @@ -1065,7 +1042,6 @@ std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire) } } pbo->SerializeToString(&wire); - IPAACA_INFO("leaving") return getWireSchema(); } @@ -1231,13 +1207,12 @@ AnnotatedData IULinkUpdateConverter::deserialize(const std::string& wireSchema, // IntConverter//{{{ IntConverter::IntConverter() -: Converter<std::string> ("int", "int", true) +: Converter<std::string> ("int", "int32", true) { } std::string IntConverter::serialize(const AnnotatedData& data, std::string& wire) { - IPAACA_INFO("entering") // Ensure that DATA actually holds a datum of the data-type we expect. assert(data.first == getDataType()); // "int" // NOTE: a dynamic_pointer_cast cannot be used from void* @@ -1246,7 +1221,6 @@ std::string IntConverter::serialize(const AnnotatedData& data, std::string& wire // transfer obj data to pbo pbo->set_value(*obj); pbo->SerializeToString(&wire); - IPAACA_INFO("leaving") return getWireSchema(); } diff --git a/cpp/src/ipaaca.h b/cpp/src/ipaaca.h index ed7a318..2c42c7e 100644 --- a/cpp/src/ipaaca.h +++ b/cpp/src/ipaaca.h @@ -1,6 +1,17 @@ #ifndef __IPAACA_H__ #define __IPAACA_H_ +/// ipaaca/IU/RSB protocol major version number +#define IPAACA_PROTOCOL_VERSION_MAJOR 1 +/// ipaaca/IU/RSB protocol minor version number +#define IPAACA_PROTOCOL_VERSION_MINOR 0 + +/// running release number of ipaaca-c++ +#define IPAACA_CPP_RELEASE_NUMBER 1 +/// date of last release number increment +#define IPAACA_CPP_RELEASE_DATE "2012-04-13" + + #ifdef IPAACA_DEBUG_MESSAGES #define IPAACA_INFO(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- " << i << std::endl; #define IPAACA_WARNING(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- WARNING: " << i << std::endl; @@ -13,9 +24,10 @@ #define IPAACA_TODO(i) ; #endif -// just for marking pure virtual functions for readability +/// marking pure virtual functions for extra readability #define _IPAACA_ABSTRACT_ +/// value to return when reading nonexistant payload keys #define IPAACA_PAYLOAD_DEFAULT_STRING_VALUE "" #include <iostream> @@ -48,28 +60,18 @@ namespace ipaaca { typedef uint32_t revision_t; +/// Type of the IU event. Realized as an integer to enable bit masks for filters. typedef uint32_t IUEventType; - #define IU_ADDED 1 #define IU_COMMITTED 2 #define IU_DELETED 4 #define IU_RETRACTED 8 #define IU_UPDATED 16 #define IU_LINKSUPDATED 32 -// +/// Bit mask for receiving all events #define IU_ALL_EVENTS 63 -/* -enum IUEventType { - IU_ADDED, - IU_COMMITTED, - IU_DELETED, - IU_RETRACTED, - IU_UPDATED, - IU_LINKSUPDATED -}; -*/ - +/// Convert an int event type to a human-readable string inline std::string iu_event_type_to_str(IUEventType type) { switch(type) { @@ -83,18 +85,13 @@ inline std::string iu_event_type_to_str(IUEventType type) } } +/// IU access mode: PUSH means that updates are broadcast; REMOTE means that reads are RPC calls; MESSAGE means a fire-and-forget message enum IUAccessMode { IU_ACCESS_PUSH, IU_ACCESS_REMOTE, IU_ACCESS_MESSAGE }; -//class { -//public: -// template<typename T> -// operator shared_ptr<T>() { return shared_ptr<T>(); } -//} NullPointer; - class PayloadEntryProxy; class Payload; class IUInterface; @@ -110,16 +107,19 @@ class Buffer; class InputBuffer; class OutputBuffer; +/// generate a UUID as an ASCII string std::string generate_uuid_string(); +/// store for (local) IUs. TODO Stores need to be unified more class IUStore: public std::map<std::string, boost::shared_ptr<IU> > { }; +/// store for RemotePushIUs. TODO Stores need to be unified more class RemotePushIUStore: public std::map<std::string, boost::shared_ptr<RemotePushIU> > // TODO genericize to all remote IU types { }; - +/// a reentrant lock/mutex class Lock { protected: @@ -291,15 +291,15 @@ class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<Inp protected: inline void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef") { - IPAACA_INFO("(ERROR) InputBuffer::_send_iu_link_update() should never be invoked") + IPAACA_WARNING("(ERROR) InputBuffer::_send_iu_link_update() should never be invoked") } inline void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef") { - IPAACA_INFO("(ERROR) InputBuffer::_send_iu_payload_update() should never be invoked") + IPAACA_WARNING("(ERROR) InputBuffer::_send_iu_payload_update() should never be invoked") } inline void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name="undef") { - IPAACA_INFO("(ERROR) InputBuffer::_send_iu_commission() should never be invoked") + IPAACA_WARNING("(ERROR) InputBuffer::_send_iu_commission() should never be invoked") } protected: RemoteServerPtr _get_remote_server(const std::string& unique_server_name); @@ -322,10 +322,6 @@ class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<Inp } boost::shared_ptr<IUInterface> get(const std::string& iu_uid); std::set<boost::shared_ptr<IUInterface> > get_ius(); - //inline void add(boost::shared_ptr<IU> iu) - //{ - // IPAACA_IMPLEMENT_ME - //} typedef boost::shared_ptr<InputBuffer> ptr; }; //}}} @@ -570,6 +566,14 @@ class Exception: public std::exception//{{{ return _description.c_str(); } };//}}} +class IUNotFoundError: public Exception//{{{ +{ + public: + inline ~IUNotFoundError() throw() { } + inline IUNotFoundError() { //boost::shared_ptr<IU> iu) { + _description = "IUNotFoundError"; + } +};//}}} class IUPublishedError: public Exception//{{{ { public: diff --git a/cpp/test/src/.gitignore b/cpp/test/src/.gitignore new file mode 100644 index 0000000..05c3e63 --- /dev/null +++ b/cpp/test/src/.gitignore @@ -0,0 +1,3 @@ +ipaaca.pb.cc +ipaaca.pb.h + diff --git a/cpp/test/src/Makefile b/cpp/test/src/Makefile new file mode 100644 index 0000000..1619f9c --- /dev/null +++ b/cpp/test/src/Makefile @@ -0,0 +1,23 @@ +CONFIG = -DIPAACA_DEBUG_MESSAGES +IPAACASOURCES = ../../src/ipaaca.cc ipaaca.pb.cc +TEXTSOURCES = ${IPAACASOURCES} textsender.cc +CCFLAGS=-I. -I../../src -I/usr/local/include -I/opt/local/include ${CONFIG} +BOOSTLIBS = -L/opt/local/lib -lboost_regex-mt -lboost_date_time-mt -lboost_program_options-mt -lboost_thread-mt -lboost_filesystem-mt -lboost_signals-mt -lboost_system-mt +PROTOLIBS = -L/opt/local/lib -lprotobuf +LIBS = ${BOOSTLIBS} ${PROTOLIBS} -L/usr/local/lib -lrsc -lrsbcore + +COMPILER = gfilt + +all: protoc textsender + + +textsender: + ${COMPILER} ${CCFLAGS} -o textsender ${TEXTSOURCES} ${LIBS} + +protoc: + protoc --proto_path=../../../proto ../../../proto/ipaaca.proto --cpp_out=. + +clean: + rm -f textsender ipaaca.pb.h ipaaca.pb.cc + + diff --git a/cpp/test/src/textsender.cc b/cpp/test/src/textsender.cc new file mode 100644 index 0000000..85be4c5 --- /dev/null +++ b/cpp/test/src/textsender.cc @@ -0,0 +1,117 @@ +#include <ipaaca.h> +#include <typeinfo> + +using namespace ipaaca; + +const char RECV_CATEGORY[] = "WORD"; +const char SEND_CATEGORY[] = "TEXT"; + +class TextSender { + protected: + OutputBuffer::ptr _ob; + InputBuffer::ptr _ib; + public: + TextSender(); + void outbuffer_handle_iu_event(IUInterface::ptr iu, IUEventType event_type, bool local); + void inbuffer_handle_iu_event(IUInterface::ptr iu, IUEventType event_type, bool local); + IUInterface::ptr find_last_iu(); + void publish_text_to_print(const std::string& text, const std::string& parent_iu_uid=""); +}; + +TextSender::TextSender() { + _ob = OutputBuffer::create("TextSenderOut"); + _ob->register_handler(boost::bind(&TextSender::outbuffer_handle_iu_event, this, _1, _2, _3)); + _ib = InputBuffer::create("TextSenderIn", RECV_CATEGORY); + _ib->register_handler(boost::bind(&TextSender::inbuffer_handle_iu_event, this, _1, _2, _3)); +} + +void TextSender::outbuffer_handle_iu_event(IUInterface::ptr iu, IUEventType event_type, bool local) +{ + std::cout << "(own IU event " << iu_event_type_to_str(event_type) << " " << iu->uid() << ")" << std::endl; + if (event_type == IU_UPDATED) { + std::set<std::string> parent_uids = iu->get_links("GRIN"); + if (parent_uids.size() > 0) { + std::string parent_uid = *(parent_uids.begin()); + std::cout << "updating parent ..." << std::endl; + std::set<std::string> next_uids = iu->get_links("SUCCESSOR"); + if (next_uids.size() > 0) { + std::string next_uid = *(next_uids.begin()); + IUInterface::ptr next_iu = _ob->get(next_uid); + std::set<std::string> next_letter_grin_links = next_iu->get_links("GRIN"); + if (next_letter_grin_links.count(parent_uid) == 0) { + // next letter belongs to new word + IUInterface::ptr parent_iu = _ib->get(parent_uid); + parent_iu->payload()["STATE"] = "REALIZED"; + } else { + IUInterface::ptr parent_iu = _ib->get(parent_uid); + parent_iu->payload()["STATE"] = "STARTED"; + } + } else { + // there are no more letters, this is the end of the final word + IUInterface::ptr parent_iu = _ib->get(parent_uid); + parent_iu->payload()["STATE"] = "REALIZED"; + } + std::cout << " ... done." << std::endl; + } + } else { + } +} + +void TextSender::inbuffer_handle_iu_event(IUInterface::ptr iu, IUEventType event_type, bool local) +{ + if (event_type == IU_LINKSUPDATED) { + std::cout << "links updated" << std::endl; + } else if (event_type == IU_ADDED) { + std::string word = iu->payload()["WORD"]; + std::cout << "Received new word: " << word << std::endl; + publish_text_to_print(word, iu->uid()); + } else if (event_type == IU_RETRACTED) { + std::string retracted_uid = iu->uid(); + } else { + std::cout << "(IU event " << iu_event_type_to_str(event_type) << " " << iu->uid() << ")" << std::endl; + } +} + +IUInterface::ptr TextSender::find_last_iu() { + std::set<IUInterface::ptr> ius = _ob->get_ius(); + for (std::set<IUInterface::ptr>::iterator it = ius.begin(); it!=ius.end(); ++it) { + if ((*it)->get_links("SUCCESSOR").size() == 0) return *it; + } + return IUInterface::ptr(); +} + +void TextSender::publish_text_to_print(const std::string& text, const std::string& parent_iu_uid) { + IUInterface::ptr previous_iu = find_last_iu(); + if (previous_iu) { + // insert a blank if we already have words in the buffer + IU::ptr iu = IU::create( SEND_CATEGORY ); + iu->payload()["CONTENT"] = " "; + _ob->add(iu); + previous_iu->add_link( "SUCCESSOR", iu->uid() ); + iu->add_link( "PREDECESSOR", previous_iu->uid() ); + if (parent_iu_uid != "") iu->add_link( "GRIN", parent_iu_uid ); + previous_iu = iu; + } + for (int i=0; i<text.size(); ++i) { + IU::ptr iu = IU::create( SEND_CATEGORY ); + iu->payload()["CONTENT"] = std::string(1, text.at(i)); + _ob->add(iu); + if (previous_iu) { + previous_iu->add_link( "SUCCESSOR", iu->uid() ); + iu->add_link( "PREDECESSOR", previous_iu->uid() ); + if (parent_iu_uid != "") iu->add_link( "GRIN", parent_iu_uid ); + } + if (previous_iu) std::cout << "previous IU: " << *previous_iu << std::endl; + previous_iu = iu; + } +} + +int main() { + TextSender sender; + sleep(1); + sender.publish_text_to_print("(INIT)"); + std::cout << "Press Ctrl-C to cancel..." << std::endl; + while (true) sleep(1); +} + + -- GitLab