From f352fa9ea3704db1537ddeb35dfa1b00493734ca Mon Sep 17 00:00:00 2001 From: Ramin Yaghoubzadeh <ryaghoubzadeh@uni-bielefeld.de> Date: Thu, 3 Dec 2015 16:25:22 +0100 Subject: [PATCH] C++: retraction fixes / more implemetation - OutputBuffer retracts all remaining live IUs on destruction - sends correct event type now - fixed reference error for calling handlers - setters properly check for retracted flag - also added some missing checks for committed flag --- ipaacalib/cpp/include/ipaaca/ipaaca-buffers.h | 9 +++-- .../cpp/include/ipaaca/ipaaca-definitions.h | 9 +++++ ipaacalib/cpp/include/ipaaca/ipaaca-ius.h | 2 + ipaacalib/cpp/src/ipaaca-buffers.cc | 40 +++++++++++++++++-- ipaacalib/cpp/src/ipaaca-ius.cc | 22 ++++++++-- 5 files changed, 71 insertions(+), 11 deletions(-) diff --git a/ipaacalib/cpp/include/ipaaca/ipaaca-buffers.h b/ipaacalib/cpp/include/ipaaca/ipaaca-buffers.h index e0a26c1..ec57c3b 100644 --- a/ipaacalib/cpp/include/ipaaca/ipaaca-buffers.h +++ b/ipaacalib/cpp/include/ipaaca/ipaaca-buffers.h @@ -275,17 +275,18 @@ IPAACA_HEADER_EXPORT class OutputBuffer: public Buffer { //, public boost::enabl // _remote_update_payload(IUPayloadUpdate) // _remote_commit(protobuf::IUCommission) IPAACA_HEADER_EXPORT void _publish_iu(boost::shared_ptr<IU> iu); - + /// mark and send IU retraction on own IU (removal from buffer is in remove(IU)) IPAACA_HEADER_EXPORT void _retract_iu(boost::shared_ptr<IU> iu); + /// mark and send retraction for all unretracted IUs (without removal, used in ~OutputBuffer) + IPAACA_HEADER_EXPORT void _retract_all_internal(); protected: /// \b Note: constructor is protected. Use create() IPAACA_HEADER_EXPORT OutputBuffer(const std::string& basename, const std::string& channel=""); // empty string auto-replaced with __ipaaca_static_option_default_channel IPAACA_HEADER_EXPORT void _initialize_server(); public: IPAACA_HEADER_EXPORT static boost::shared_ptr<OutputBuffer> create(const std::string& basename); - IPAACA_HEADER_EXPORT ~OutputBuffer() { - IPAACA_IMPLEMENT_ME - } + /// OutputBuffer destructor will retract all IUs that are still live + IPAACA_HEADER_EXPORT ~OutputBuffer(); IPAACA_HEADER_EXPORT void add(boost::shared_ptr<IU> iu); IPAACA_HEADER_EXPORT boost::shared_ptr<IU> remove(const std::string& iu_uid); IPAACA_HEADER_EXPORT boost::shared_ptr<IU> remove(boost::shared_ptr<IU> iu); diff --git a/ipaacalib/cpp/include/ipaaca/ipaaca-definitions.h b/ipaacalib/cpp/include/ipaaca/ipaaca-definitions.h index 42e4f10..6d9104e 100644 --- a/ipaacalib/cpp/include/ipaaca/ipaaca-definitions.h +++ b/ipaacalib/cpp/include/ipaaca/ipaaca-definitions.h @@ -152,6 +152,15 @@ IPAACA_HEADER_EXPORT class IUCommittedError: public Exception//{{{ _description = "IUCommittedError"; } };//}}} +/// IU had already been retracted +IPAACA_HEADER_EXPORT class IURetractedError: public Exception//{{{ +{ + public: + IPAACA_HEADER_EXPORT inline ~IURetractedError() throw() { } + IPAACA_HEADER_EXPORT inline IURetractedError() { //boost::shared_ptr<IU> iu) { + _description = "IURetractedError"; + } +};//}}} /// Remote IU update failed because it had been modified in the mean time IPAACA_HEADER_EXPORT class IUUpdateFailedError: public Exception//{{{ { diff --git a/ipaacalib/cpp/include/ipaaca/ipaaca-ius.h b/ipaacalib/cpp/include/ipaaca/ipaaca-ius.h index f4fc8fc..d19d7d0 100644 --- a/ipaacalib/cpp/include/ipaaca/ipaaca-ius.h +++ b/ipaacalib/cpp/include/ipaaca/ipaaca-ius.h @@ -95,6 +95,8 @@ IPAACA_HEADER_EXPORT class IUInterface {//{{{ IPAACA_HEADER_EXPORT void _add_and_remove_links(const LinkMap& add, const LinkMap& remove) { _links._add_and_remove_links(add, remove); } IPAACA_HEADER_EXPORT void _replace_links(const LinkMap& links) { _links._replace_links(links); } public: + /// Return whether IU has been retracted + IPAACA_HEADER_EXPORT inline bool retracted() const { return _retracted; } /// Return whether IU has already been published (is in a Buffer). IPAACA_HEADER_EXPORT inline bool is_published() { return (_buffer != 0); } /// Return auto-generated UID string (set during IU construction) diff --git a/ipaacalib/cpp/src/ipaaca-buffers.cc b/ipaacalib/cpp/src/ipaaca-buffers.cc index ad0462a..cec49b8 100644 --- a/ipaacalib/cpp/src/ipaaca-buffers.cc +++ b/ipaacalib/cpp/src/ipaaca-buffers.cc @@ -184,6 +184,12 @@ IPAACA_EXPORT boost::shared_ptr<int> CallbackIUPayloadUpdate::call(const std::st IPAACA_INFO(" Referred-to revision was " << update->revision << " while local one is " << iu->_revision) iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); + } else if (iu->committed()) { + iu->_revision_lock.unlock(); + return boost::shared_ptr<int>(new int(0)); + } else if (iu->retracted()) { + iu->_revision_lock.unlock(); + return boost::shared_ptr<int>(new int(0)); } if (update->is_delta) { // FIXME FIXME this is an unsolved problem atm: deletes in a delta update are @@ -216,6 +222,12 @@ IPAACA_EXPORT boost::shared_ptr<int> CallbackIULinkUpdate::call(const std::strin IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid) iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); + } else if (iu->committed()) { + iu->_revision_lock.unlock(); + return boost::shared_ptr<int>(new int(0)); + } else if (iu->retracted()) { + iu->_revision_lock.unlock(); + return boost::shared_ptr<int>(new int(0)); } if (update->is_delta) { iu->modify_links(update->new_links, update->links_to_remove, update->writer_name); @@ -240,8 +252,11 @@ IPAACA_EXPORT boost::shared_ptr<int> CallbackIUCommission::call(const std::strin IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid()) iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); - } - if (iu->committed()) { + } else if (iu->committed()) { + iu->_revision_lock.unlock(); + return boost::shared_ptr<int>(new int(0)); + } else if (iu->retracted()) { + iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(0)); } else { } @@ -374,6 +389,8 @@ IPAACA_EXPORT void OutputBuffer::add(IU::ptr iu) } if (iu->is_published()) { throw IUPublishedError(); + } else if (iu->retracted()) { + throw IURetractedError(); } if (iu->access_mode() != IU_ACCESS_MESSAGE) { // (for Message-type IUs: do not actually store them) @@ -433,6 +450,8 @@ IPAACA_EXPORT boost::shared_ptr<IU> OutputBuffer::remove(IU::ptr iu) IPAACA_EXPORT void OutputBuffer::_retract_iu(IU::ptr iu) { + if (iu->_retracted) return; // ignore subsequent retractions + iu->_retracted = true; Informer<protobuf::IURetraction>::DataPtr data(new protobuf::IURetraction()); data->set_uid(iu->uid()); data->set_revision(iu->revision()); @@ -440,6 +459,20 @@ IPAACA_EXPORT void OutputBuffer::_retract_iu(IU::ptr iu) informer->publish(data); } +IPAACA_EXPORT void OutputBuffer::_retract_all_internal() +{ + for (IUStore::iterator it=_iu_store.begin(); it!=_iu_store.end(); ++it) { + if (!(it->second->_retracted)) { + _retract_iu(it->second); + } + } +} + +IPAACA_EXPORT OutputBuffer::~OutputBuffer() +{ + _retract_all_internal(); +} + //}}} @@ -740,10 +773,11 @@ IPAACA_EXPORT void InputBuffer::_handle_iu_events(EventPtr event) // it->second->_revision = update->revision(); it->second->_apply_retraction(); + auto final_iu_ref = it->second; // remove from InputBuffer FIXME: this is a crossover between retracted and deleted behavior _iu_store.erase(it->first); // and call the handler. IU reference is still valid for this call, although removed from buffer. - call_iu_event_handlers(it->second, false, IU_COMMITTED, it->second->category() ); + call_iu_event_handlers(final_iu_ref, false, IU_RETRACTED, it->second->category() ); // } else { IPAACA_WARNING("(Unhandled Event type " << type << " !)"); diff --git a/ipaacalib/cpp/src/ipaaca-ius.cc b/ipaacalib/cpp/src/ipaaca-ius.cc index 07caeb6..16e3216 100644 --- a/ipaacalib/cpp/src/ipaaca-ius.cc +++ b/ipaacalib/cpp/src/ipaaca-ius.cc @@ -62,6 +62,7 @@ IPAACA_EXPORT IU::IU(const std::string& category, IUAccessMode access_mode, bool _read_only = read_only; _access_mode = access_mode; _committed = false; + _retracted = false; } IPAACA_EXPORT void IU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) @@ -70,6 +71,9 @@ IPAACA_EXPORT void IU::_modify_links(bool is_delta, const LinkMap& new_links, co if (_committed) { _revision_lock.unlock(); throw IUCommittedError(); + } else if (_retracted) { + _revision_lock.unlock(); + throw IURetractedError(); } _increase_revision_number(); if (is_published()) { @@ -106,6 +110,9 @@ IPAACA_EXPORT void IU::_modify_payload(bool is_delta, const std::map<std::string if (_committed) { _revision_lock.unlock(); throw IUCommittedError(); + } else if (_retracted) { + _revision_lock.unlock(); + throw IURetractedError(); } _increase_revision_number(); if (is_published()) { @@ -134,6 +141,9 @@ IPAACA_EXPORT void IU::_internal_commit(const std::string& writer_name) if (_committed) { _revision_lock.unlock(); throw IUCommittedError(); + } else if (_retracted) { + _revision_lock.unlock(); + throw IURetractedError(); } _increase_revision_number(); _committed = true; @@ -198,8 +208,9 @@ IPAACA_EXPORT void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new { if (_committed) { throw IUCommittedError(); - } - if (_read_only) { + } else if (_retracted) { + throw IURetractedError(); + } else if (_read_only) { throw IUReadOnlyError(); } RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name); @@ -222,8 +233,9 @@ IPAACA_EXPORT void RemotePushIU::_modify_payload(bool is_delta, const std::map<s //std::cout << "-- Sending a modify_payload with " << new_items.size() << " keys to merge." << std::endl; if (_committed) { throw IUCommittedError(); - } - if (_read_only) { + } else if (_retracted) { + throw IURetractedError(); + } else if (_read_only) { throw IUReadOnlyError(); } RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name); @@ -247,6 +259,8 @@ IPAACA_EXPORT void RemotePushIU::commit() { if (_read_only) { throw IUReadOnlyError(); + } else if (_retracted) { + throw IURetractedError(); } if (_committed) { // Following python version: ignoring multiple commit -- GitLab