diff --git a/cpp/src/Makefile b/cpp/src/Makefile index f553bd1a17c634c6f5896a560923e4b416be2394..00c2459e60478819473f00cb151c0a3321ac1b23 100644 --- a/cpp/src/Makefile +++ b/cpp/src/Makefile @@ -7,7 +7,7 @@ LIBS = ${BOOSTLIBS} ${PROTOLIBS} -L/usr/local/lib -lrsc -lrsbcore COMPILER = gfilt -all: main +all: receiver sender receiver: @@ -23,6 +23,6 @@ protoc: protoc --proto_path=../../proto ../../proto/ipaaca.proto --cpp_out=. clean: - rm -f ipaaca-test ipaaca-sender ipaaca-receiver ipaaca.pb.h ipaaca.pb.cc + rm -f ipaaca-main ipaaca-sender ipaaca-receiver ipaaca.pb.h ipaaca.pb.cc diff --git a/cpp/src/ipaaca-test-main.cc b/cpp/src/ipaaca-test-main.cc index c10927926f1828b409cad7b072ba828946edd6b5..86c732db214e73a0bae118f63642774bf3af0626 100644 --- a/cpp/src/ipaaca-test-main.cc +++ b/cpp/src/ipaaca-test-main.cc @@ -5,7 +5,7 @@ //#include <rsc/logging/LoggerFactory.h> // //rsc::logging::LoggerFactory::getInstance().reconfigure(rsc::logging::Logger::LEVEL_ALL); -#ifdef MAKE_RECEIVER +#if 0 //boost::mutex mtx; using namespace ipaaca; @@ -42,8 +42,11 @@ int main() { } return EXIT_SUCCESS; } -#else -#ifdef MAKE_SENDER +// +// +// +// + using namespace ipaaca; int main() { initialize_ipaaca_rsb(); @@ -78,7 +81,7 @@ int main() { std::cout << "Done." << std::endl; return EXIT_SUCCESS; } -#else +#endif // // TESTS @@ -86,6 +89,24 @@ int main() { using namespace ipaaca; +#ifdef MAKE_RECEIVER +int main() { + try{ + initialize_ipaaca_rsb(); + + InputBuffer ib("TestIB", "testcategory"); + + + while (true) { + sleep(1); + } + + } catch (ipaaca::Exception& e) { + std::cout << "== IPAACA EXCEPTION == " << e.what() << std::endl; + } +} +#else +#ifdef MAKE_SENDER int main() { try{ initialize_ipaaca_rsb(); diff --git a/cpp/src/ipaaca.cc b/cpp/src/ipaaca.cc index 68691e233c50ae7f75da6959d1cb2e34884bf7f9..00df2d0a17c11784c54009b927fe32700a636ece 100644 --- a/cpp/src/ipaaca.cc +++ b/cpp/src/ipaaca.cc @@ -47,6 +47,29 @@ void init_inprocess_too() { */ //}}} +std::ostream& operator<<(std::ostream& os, const Payload& obj)//{{{ +{ + os << "{"; + bool first = true; + for (std::map<std::string, std::string>::const_iterator it=obj._store.begin(); it!=obj._store.end(); ++it) { + if (first) { first=false; } else { os << ", "; } + os << "'" << it->first << "':'" << it->second << "'"; + } + os << "}"; + return os; +} +//}}} +std::ostream& operator<<(std::ostream& os, const IUInterface& obj)//{{{ +{ + os << "IUInterface(uid=" << obj.uid() << ", revision=" << obj.revision(); + os << ", owner_name=" << obj.owner_name(); + os << ", payload = "; + bool first = true; + os << obj.const_payload(); + os << ")"; + return os; +} +//}}} std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj)//{{{ { os << "PayloadUpdate(uid=" << obj.uid << ", revision=" << obj.revision; @@ -284,10 +307,179 @@ boost::shared_ptr<IU> OutputBuffer::remove(IU::ref iu) //}}} // InputBuffer//{{{ -InputBuffer::InputBuffer(const std::string& basename) -:Buffer(basename) +InputBuffer::InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests) +:Buffer(basename, "IB") +{ + for (std::vector<std::string>::const_iterator it=category_interests.begin(); it!=category_interests.end(); ++it) { + _create_category_listener_if_needed(*it); + } +} +InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1) +:Buffer(basename, "IB") { + _create_category_listener_if_needed(category_interest1); } +InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2) +:Buffer(basename, "IB") +{ + _create_category_listener_if_needed(category_interest1); + _create_category_listener_if_needed(category_interest2); +} +InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3) +:Buffer(basename, "IB") +{ + _create_category_listener_if_needed(category_interest1); + _create_category_listener_if_needed(category_interest2); + _create_category_listener_if_needed(category_interest3); +} +InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4) +:Buffer(basename, "IB") +{ + _create_category_listener_if_needed(category_interest1); + _create_category_listener_if_needed(category_interest2); + _create_category_listener_if_needed(category_interest3); + _create_category_listener_if_needed(category_interest4); +} + +RemoteServerPtr InputBuffer::_get_remote_server(boost::shared_ptr<IU> iu) +{ + IPAACA_IMPLEMENT_ME + return RemoteServerPtr(); +} + +ListenerPtr InputBuffer::_create_category_listener_if_needed(const std::string& category) +{ + IPAACA_INFO("entering") + std::map<std::string, ListenerPtr>::iterator it = _listener_store.find(category); + if (it!=_listener_store.end()) return it->second; + IPAACA_INFO("creating a new listener") + std::string scope_string = "/ipaaca/category/" + category; + ListenerPtr listener = Factory::getInstance().createListener( Scope(scope_string) ); + HandlerPtr event_handler = HandlerPtr( + new EventFunctionHandler( + boost::bind(&InputBuffer::_handle_iu_events, this, _1) + ) + ); + listener->addHandler(event_handler); + _listener_store[category] = listener; + IPAACA_INFO("done") + return listener; + /* + '''Return (or create, store and return) a category listener.''' + if iu_category in self._listener_store: return self._informer_store[iu_category] + cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config) + cat_listener.addHandler(self._handle_iu_events) + self._listener_store[iu_category] = cat_listener + self._category_interests.append(iu_category) + logger.info("Added listener in scope "+"/ipaaca/category/"+iu_category) + return cat_listener + */ +} +void InputBuffer::call_iu_event_handlers(const std::string& uid, bool local, IUEventType event_type, const std::string& category) +{ + IPAACA_INFO("handling an event " << ipaaca::iu_event_type_to_str(event_type) << " for IU " << uid) +} +void InputBuffer::_handle_iu_events(EventPtr event) +{ + std::string type = event->getType(); + if (type == "ipaaca::RemotePushIU") { + boost::shared_ptr<RemotePushIU> iu = boost::static_pointer_cast<RemotePushIU>(event->getData()); + if (_iu_store.count(iu->category()) > 0) { + // already got the IU... ignore + } else { + _iu_store[iu->uid()] = iu; + iu->_set_buffer(this); + call_iu_event_handlers(iu->uid(), false, IU_ADDED, iu->category() ); + } + IPAACA_INFO( "New RemotePushIU state: " << (*iu) ) + } else { + RemotePushIUStore::iterator it; + if (type == "ipaaca::IUPayloadUpdate") { + boost::shared_ptr<IUPayloadUpdate> update = boost::static_pointer_cast<IUPayloadUpdate>(event->getData()); + if (update->writer_name == _unique_name) { + //IPAACA_INFO("Ignoring locally-written IU update") + return; + } + it = _iu_store.find(update->uid); + if (it == _iu_store.end()) { + IPAACA_INFO("Ignoring UPDATED message for an IU that we did not fully receive before") + return; + } + // + it->second->_apply_update(update); + call_iu_event_handlers(it->second->uid(), false, IU_UPDATED, it->second->category() ); + // + // + } else if (type == "ipaaca::IULinkUpdate") { + boost::shared_ptr<IULinkUpdate> update = boost::static_pointer_cast<IULinkUpdate>(event->getData()); + if (update->writer_name == _unique_name) { + //IPAACA_INFO("Ignoring locally-written IU update") + return; + } + it = _iu_store.find(update->uid); + if (it == _iu_store.end()) { + IPAACA_INFO("Ignoring LINKSUPDATED message for an IU that we did not fully receive before") + return; + } + // + it->second->_apply_link_update(update); + call_iu_event_handlers(it->second->uid(), false, IU_LINKSUPDATED, it->second->category() ); + // + // + } else if (type == "ipaaca::protobuf::IUCommission") { + boost::shared_ptr<protobuf::IUCommission> update = boost::static_pointer_cast<protobuf::IUCommission>(event->getData()); + if (update->writer_name() == _unique_name) { + //IPAACA_INFO("Ignoring locally-written IU commit") + return; + } + it = _iu_store.find(update->uid()); + if (it == _iu_store.end()) { + IPAACA_INFO("Ignoring COMMITTED message for an IU that we did not fully receive before") + return; + } + // + it->second->_apply_commission(); + it->second->_revision = update->revision(); + call_iu_event_handlers(it->second->uid(), false, IU_COMMITTED, it->second->category() ); + // + // + } else { + std::cout << "(Unhandled Event type " << type << " !)" << std::endl; + return; + } + IPAACA_INFO( "New RemotePushIU state: " << *(it->second) ) + } + /* + else: + # an update to an existing IU + if event.data.writer_name == self.unique_name: + # Discard updates that originate from this buffer + return + if event.data.uid not in self._iu_store: + # TODO: we should request the IU's owner to send us the IU + logger.warning("Update message for IU which we did not fully receive before.") + return + if type_ is ipaaca_pb2.IUCommission: + # IU commit + iu = self._iu_store[event.data.uid] + iu._apply_commission() + iu._revision = event.data.revision + self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category) + elif type_ is IUPayloadUpdate: + # IU payload update + iu = self._iu_store[event.data.uid] + iu._apply_update(event.data) + self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category) + elif type_ is IULinkUpdate: + # IU link update + iu = self._iu_store[event.data.uid] + iu._apply_link_update(event.data) + self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.LINKSUPDATED, category=iu.category) + else: + logger.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_)) + */ +} + //}}} @@ -454,6 +646,48 @@ void RemotePushIU::commit() { IPAACA_IMPLEMENT_ME } +void RemotePushIU::_apply_link_update(IULinkUpdate::ptr update) +{ + _revision = update->revision; + if (update->is_delta) { + _add_and_remove_links(update->new_links, update->links_to_remove); + } else { + _replace_links(update->new_links); + } +} +void RemotePushIU::_apply_update(IUPayloadUpdate::ptr update) +{ + _revision = update->revision; + if (update->is_delta) { + for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) { + _payload._remotely_enforced_delitem(*it); + } + for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) { + _payload._remotely_enforced_setitem(it->first, it->second); + } + } else { + _payload._remotely_enforced_wipe(); + for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) { + _payload._remotely_enforced_setitem(it->first, it->second); + } + } +} +void RemotePushIU::_apply_commission() +{ + IPAACA_IMPLEMENT_ME +} +void Payload::_remotely_enforced_wipe() +{ + _store.clear(); +} +void Payload::_remotely_enforced_delitem(const std::string& k) +{ + _store.erase(k); +} +void Payload::_remotely_enforced_setitem(const std::string& k, const std::string& v) +{ + _store[k] = v; +} //}}} @@ -591,7 +825,8 @@ AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std: ls.insert(pls.targets(j)); } } - return std::make_pair(getDataType(), obj); + //return std::make_pair(getDataType(), obj); + return std::make_pair("ipaaca::RemotePushIU", obj); break; } default: diff --git a/cpp/src/ipaaca.h b/cpp/src/ipaaca.h index 4413952a325c68bc36925ea3ffc1e73cd9abb663..ca4a4f08d405974255b6731009542f7e68471dc2 100644 --- a/cpp/src/ipaaca.h +++ b/cpp/src/ipaaca.h @@ -38,7 +38,9 @@ //using namespace boost; using namespace rsb; +using namespace rsb::filter; using namespace rsb::converter; +using namespace rsb::patterns; namespace ipaaca { @@ -52,6 +54,18 @@ enum IUEventType { IU_UPDATED, IU_LINKSUPDATED }; +inline std::string iu_event_type_to_str(IUEventType type) +{ + switch(type) { + case IU_ADDED: return "ADDED"; + case IU_COMMITTED: return "COMMITTED"; + case IU_DELETED: return "DELETED"; + case IU_RETRACTED: return "RETRACTED"; + case IU_UPDATED: return "UPDATED"; + case IU_LINKSUPDATED: return "LINKSUPDATED"; + default: return "(IU_EVENT_TYPE_UNKNOWN)"; + } +} enum IUAccessMode { IU_ACCESS_PUSH, @@ -140,11 +154,12 @@ class Buffer { //: public boost::enable_shared_from_this<Buffer> { std::string _uuid; std::string _basename; std::string _unique_name; + std::string _id_prefix; protected: _IPAACA_ABSTRACT_ virtual void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef") = 0; _IPAACA_ABSTRACT_ virtual void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef") = 0; _IPAACA_ABSTRACT_ virtual void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name="undef") = 0; - void _allocate_unique_name(const std::string& basename); + void _allocate_unique_name(const std::string& basename, const std::string& function); inline Buffer(const std::string& basename, const std::string& function) { _allocate_unique_name(basename, function); } @@ -187,6 +202,9 @@ class OutputBuffer: public Buffer { //, public boost::enable_shared_from_this<Ou class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<InputBuffer> { friend class IU; friend class RemotePushIU; + protected: + std::map<std::string, ListenerPtr> _listener_store; + RemotePushIUStore _iu_store; // TODO genericize protected: inline void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef") { @@ -200,15 +218,24 @@ class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<Inp { IPAACA_INFO("(ERROR) InputBuffer::_send_iu_commission() should never be invoked") } + protected: + RemoteServerPtr _get_remote_server(boost::shared_ptr<IU> iu); + ListenerPtr _create_category_listener_if_needed(const std::string& category); + void _handle_iu_events(EventPtr event); + void call_iu_event_handlers(const std::string& uid, bool local, IUEventType event_type, const std::string& category); public: - InputBuffer(const std::string& basename); + InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests); + InputBuffer(const std::string& basename, const std::string& category_interest1); + InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2); + InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3); + InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4); ~InputBuffer() { IPAACA_IMPLEMENT_ME } - inline void add(boost::shared_ptr<IU> iu) - { - IPAACA_IMPLEMENT_ME - } + //inline void add(boost::shared_ptr<IU> iu) + //{ + // IPAACA_IMPLEMENT_ME + //} }; /* @@ -237,6 +264,7 @@ class IUPayloadUpdate {//{{{ std::map<std::string, std::string> new_items; std::vector<std::string> keys_to_remove; friend std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj); + typedef boost::shared_ptr<IUPayloadUpdate> ptr; };//}}} class IUPayloadUpdateConverter: public rsb::converter::Converter<std::string> {//{{{ public: @@ -254,6 +282,7 @@ class IULinkUpdate {//{{{ std::map<std::string, std::set<std::string> > new_links; std::map<std::string, std::set<std::string> > links_to_remove; friend std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj); + typedef boost::shared_ptr<IULinkUpdate> ptr; };//}}} class IULinkUpdateConverter: public rsb::converter::Converter<std::string> {//{{{ public: @@ -282,6 +311,7 @@ class PayloadEntryProxy//{{{ class Payload//{{{ { + friend std::ostream& operator<<(std::ostream& os, const Payload& obj); friend class IUInterface; friend class IU; friend class RemotePushIU; @@ -293,6 +323,9 @@ class Payload//{{{ protected: void initialize(boost::shared_ptr<IUInterface> iu); inline void _set_owner_name(const std::string& name) { _owner_name = name; } + void _remotely_enforced_wipe(); + void _remotely_enforced_delitem(const std::string& k); + void _remotely_enforced_setitem(const std::string& k, const std::string& v); public: inline const std::string& owner_name() { return _owner_name; } // access @@ -305,6 +338,7 @@ class Payload//{{{ class IUInterface {//{{{ friend class IUConverter; + friend std::ostream& operator<<(std::ostream& os, const IUInterface& obj); protected: IUInterface(); public: @@ -352,6 +386,7 @@ class IUInterface {//{{{ inline const LinkMap& get_all_links() { return _links.get_all_links(); } // Payload _IPAACA_ABSTRACT_ virtual Payload& payload() = 0; + _IPAACA_ABSTRACT_ virtual const Payload& const_payload() const = 0; // setters _IPAACA_ABSTRACT_ virtual void commit() = 0; // functions to modify and update links: @@ -381,6 +416,7 @@ class IU: public IUInterface {//{{{ } static boost::shared_ptr<IU> create(const std::string& category, IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" ); inline Payload& payload() { return _payload; } + inline const Payload& const_payload() const { return _payload; } void commit(); protected: void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = ""); @@ -406,10 +442,15 @@ class RemotePushIU: public IUInterface {//{{{ IPAACA_IMPLEMENT_ME } inline Payload& payload() { return _payload; } + inline const Payload& const_payload() const { return _payload; } void commit(); protected: void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = ""); void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name = ""); + protected: + void _apply_update(IUPayloadUpdate::ptr update); + void _apply_link_update(IULinkUpdate::ptr update); + void _apply_commission(); typedef boost::shared_ptr<RemotePushIU> ref; };//}}}