From 554ee60efa7ffcc1f71c3d05765c1298e6e94841 Mon Sep 17 00:00:00 2001 From: Dennis Leroy Wigand <dwigand@TechFak.Uni-Bielefeld.DE> Date: Tue, 30 Sep 2014 15:19:17 +0200 Subject: [PATCH] added resend trigger to c++ --- ipaacalib/cpp/include/ipaaca/ipaaca.h | 4 ++ ipaacalib/cpp/src/ipaaca.cc | 60 ++++++++++++--------------- 2 files changed, 31 insertions(+), 33 deletions(-) diff --git a/ipaacalib/cpp/include/ipaaca/ipaaca.h b/ipaacalib/cpp/include/ipaaca/ipaaca.h index b60b0ba..f2fb763 100644 --- a/ipaacalib/cpp/include/ipaaca/ipaaca.h +++ b/ipaacalib/cpp/include/ipaaca/ipaaca.h @@ -486,6 +486,10 @@ IPAACA_HEADER_EXPORT class InputBuffer: public Buffer { //, public boost::enable IPAACA_WARNING("(ERROR) InputBuffer::_send_iu_resendrequest() should never be invoked") } protected: + IPAACA_MEMBER_VAR_EXPORT bool triggerResend; + IPAACA_HEADER_EXPORT void SetResend(bool resendActive); + IPAACA_HEADER_EXPORT bool GetResend(); + IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::set<std::string>& category_interests); IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests); IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::string& category_interest1); diff --git a/ipaacalib/cpp/src/ipaaca.cc b/ipaacalib/cpp/src/ipaaca.cc index 4001e1c..84189d4 100644 --- a/ipaacalib/cpp/src/ipaaca.cc +++ b/ipaacalib/cpp/src/ipaaca.cc @@ -502,14 +502,7 @@ IPAACA_EXPORT boost::shared_ptr<int> CallbackIUResendRequest::call(const std::st return boost::shared_ptr<int>(new int(0)); } IU::ptr iu = boost::static_pointer_cast<IU>(iui); - if ((update->has_hidden_scope_name() == true)&&(update->hidden_scope_name().compare("") != 0)){// dlw TODO - //Informer<AnyType>::Ptr informer = _buffer->_get_informer(update->hidden_scope_name()); - //Informer<ipaaca::IU>::DataPtr iu_data(iu); - //informer->publish(iu_data); - //_buffer->call_iu_event_handlers(iu, true, IU_RESENDREQUEST, update->getHiddenScopeName()); - //_buffer->_publish_resend_update(iui, update->hidden_scope_name()); - - std::cout << "dlw call_iu_event_handlers " << update->hidden_scope_name() << std::endl; + if ((update->has_hidden_scope_name() == true)&&(update->hidden_scope_name().compare("") != 0)){ //_buffer->call_iu_event_handlers(iu, true, IU_UPDATED, update->hidden_scope_name()); revision_t revision = iu->revision(); @@ -695,6 +688,7 @@ IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::s _create_category_listener_if_needed(*it); } _create_category_listener_if_needed(_uuid); + triggerResend = false; } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests) :Buffer(basename, "IB") @@ -703,12 +697,14 @@ IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::v _create_category_listener_if_needed(*it); } _create_category_listener_if_needed(_uuid); + triggerResend = false; } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1) :Buffer(basename, "IB") { _create_category_listener_if_needed(category_interest1); _create_category_listener_if_needed(_uuid); + triggerResend = false; } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2) :Buffer(basename, "IB") @@ -716,6 +712,7 @@ IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::s _create_category_listener_if_needed(category_interest1); _create_category_listener_if_needed(category_interest2); _create_category_listener_if_needed(_uuid); + triggerResend = false; } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3) :Buffer(basename, "IB") @@ -724,6 +721,7 @@ IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::s _create_category_listener_if_needed(category_interest2); _create_category_listener_if_needed(category_interest3); _create_category_listener_if_needed(_uuid); + triggerResend = false; } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4) :Buffer(basename, "IB") @@ -733,6 +731,7 @@ IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::s _create_category_listener_if_needed(category_interest3); _create_category_listener_if_needed(category_interest4); _create_category_listener_if_needed(_uuid); + triggerResend = false; } @@ -767,6 +766,18 @@ IPAACA_EXPORT InputBuffer::ptr InputBuffer::create(const std::string& basename, return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3, category_interest4)); } + +IPAACA_EXPORT void InputBuffer::SetResend(bool resendActive) +{ + triggerResend = resendActive; +} + +IPAACA_EXPORT bool InputBuffer::GetResend() +{ + return triggerResend; +} + + IPAACA_EXPORT IUInterface::ptr InputBuffer::get(const std::string& iu_uid) { RemotePushIUStore::iterator it = _iu_store.find(iu_uid); // TODO genericize @@ -799,7 +810,6 @@ IPAACA_EXPORT ListenerPtr InputBuffer::_create_category_listener_if_needed(const return it->second; } IPAACA_INFO("Creating a new listener for category " << category) - std::cout << "Creating a new listener for category " << category << std::endl; std::string scope_string = "/ipaaca/category/" + category; ListenerPtr listener = getFactory().createListener( Scope(scope_string) ); IPAACA_INFO("Adding handler") @@ -814,21 +824,20 @@ IPAACA_EXPORT ListenerPtr InputBuffer::_create_category_listener_if_needed(const return listener; } IPAACA_EXPORT void InputBuffer::_trigger_resend_request(EventPtr event) { + if (!triggerResend) + return; std::string type = event->getType(); std::string uid = ""; std::string writerName = ""; if (type == "ipaaca::IUPayloadUpdate") { - std::cout << "trigger ipaaca::IUPayloadUpdate" << std::endl; boost::shared_ptr<IUPayloadUpdate> update = boost::static_pointer_cast<IUPayloadUpdate>(event->getData()); uid = update->uid; writerName = update->writer_name; } else if (type == "ipaaca::IULinkUpdate") { - std::cout << "trigger ipaaca::IULinkUpdate" << std::endl; boost::shared_ptr<IULinkUpdate> update = boost::static_pointer_cast<IULinkUpdate>(event->getData()); uid = update->uid; writerName = update->writer_name; } else if (type == "ipaaca::protobuf::IUCommission") { - std::cout << "trigger ipaaca::protobuf::IUCommission" << std::endl; boost::shared_ptr<protobuf::IUCommission> update = boost::static_pointer_cast<protobuf::IUCommission>(event->getData()); uid = update->uid(); writerName = update->writer_name(); @@ -837,11 +846,8 @@ IPAACA_EXPORT void InputBuffer::_trigger_resend_request(EventPtr event) { } if (!writerName.empty()) { - std::cout << "writer name not empty " << writerName << std::endl; - //RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name); RemoteServerPtr server = _get_remote_server(writerName); if (!uid.empty()) { - std::cout << "uid not empty " << uid << std::endl; boost::shared_ptr<protobuf::IUResendRequest> update = boost::shared_ptr<protobuf::IUResendRequest>(new protobuf::IUResendRequest()); update->set_uid(uid); update->set_hidden_scope_name(_uuid); @@ -849,7 +855,7 @@ IPAACA_EXPORT void InputBuffer::_trigger_resend_request(EventPtr event) { if (*result == 0) { throw IUResendRequestFailedError(); } else { - std::cout << "revision " << *result << std::endl; //TODO return dlw TODO + std::cout << "revision " << *result << std::endl; } } } @@ -859,7 +865,6 @@ IPAACA_EXPORT void InputBuffer::_handle_iu_events(EventPtr event) std::cout << "handle iu events" << std::endl; std::string type = event->getType(); if (type == "ipaaca::RemotePushIU") { - std::cout << "ipaaca::RemotePushIU" << std::endl; boost::shared_ptr<RemotePushIU> iu = boost::static_pointer_cast<RemotePushIU>(event->getData()); if (_iu_store.count(iu->category()) > 0) { // already got the IU... ignore @@ -870,7 +875,6 @@ IPAACA_EXPORT void InputBuffer::_handle_iu_events(EventPtr event) } //IPAACA_INFO( "New RemotePushIU state: " << (*iu) ) } else if (type == "ipaaca::RemoteMessage") { - std::cout << "ipaaca::RemoteMessage" << std::endl; boost::shared_ptr<RemoteMessage> iu = boost::static_pointer_cast<RemoteMessage>(event->getData()); //_iu_store[iu->uid()] = iu; //iu->_set_buffer(this); @@ -878,10 +882,8 @@ IPAACA_EXPORT void InputBuffer::_handle_iu_events(EventPtr event) call_iu_event_handlers(iu, false, IU_MESSAGE, iu->category() ); //_iu_store.erase(iu->uid()); } else { - std::cout << "elseeeeeee" << std::endl; RemotePushIUStore::iterator it; if (type == "ipaaca::IUPayloadUpdate") { - std::cout << "ipaaca::IUPayloadUpdate" << std::endl; boost::shared_ptr<IUPayloadUpdate> update = boost::static_pointer_cast<IUPayloadUpdate>(event->getData()); //IPAACA_INFO("** writer name: " << update->writer_name) std::cout << "writer name " << update->writer_name << std::endl; @@ -890,16 +892,14 @@ IPAACA_EXPORT void InputBuffer::_handle_iu_events(EventPtr event) } it = _iu_store.find(update->uid); if (it == _iu_store.end()) { - std::cout << "Using UPDATED message for an IU that we did not fully receive before" << update->writer_name << std::endl; _trigger_resend_request(event); IPAACA_INFO("Using UPDATED message for an IU that we did not fully receive before") return; } - // + it->second->_apply_update(update); call_iu_event_handlers(it->second, false, IU_UPDATED, it->second->category() ); - // - // + } else if (type == "ipaaca::IULinkUpdate") { boost::shared_ptr<IULinkUpdate> update = boost::static_pointer_cast<IULinkUpdate>(event->getData()); if (update->writer_name == _unique_name) { @@ -911,11 +911,10 @@ IPAACA_EXPORT void InputBuffer::_handle_iu_events(EventPtr event) IPAACA_INFO("Ignoring LINKSUPDATED message for an IU that we did not fully receive before") return; } - // + it->second->_apply_link_update(update); call_iu_event_handlers(it->second, false, IU_LINKSUPDATED, it->second->category() ); - // - // + } else if (type == "ipaaca::protobuf::IUCommission") { boost::shared_ptr<protobuf::IUCommission> update = boost::static_pointer_cast<protobuf::IUCommission>(event->getData()); if (update->writer_name() == _unique_name) { @@ -1092,13 +1091,8 @@ IPAACA_EXPORT void IU::_publish_resend(IU::ptr iu, const std::string& hidden_sco //} //_increase_revision_number(); //if (is_published()) { -//IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name -//dlw TODO -// IU::ptr iu = boost::static_pointer_cast<IU>(this); - //IU::ptr iu = boost::make_static(this); - + //IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name _buffer->_publish_iu_resend(iu, hidden_scope_name); - //} //_revision_lock.unlock(); } -- GitLab