Skip to content
Snippets Groups Projects
Commit 730c865d authored by Ramin Yaghoubzadeh Torky's avatar Ramin Yaghoubzadeh Torky
Browse files

C++: retraction fixes / more implemetation

- OutputBuffer retracts all remaining live IUs on destruction
- sends correct event type now
- fixed reference error for calling handlers
- setters properly check for retracted flag
- also added some missing checks for committed flag
parent 7861c74e
No related branches found
No related tags found
No related merge requests found
...@@ -275,17 +275,18 @@ IPAACA_HEADER_EXPORT class OutputBuffer: public Buffer { //, public boost::enabl ...@@ -275,17 +275,18 @@ IPAACA_HEADER_EXPORT class OutputBuffer: public Buffer { //, public boost::enabl
// _remote_update_payload(IUPayloadUpdate) // _remote_update_payload(IUPayloadUpdate)
// _remote_commit(protobuf::IUCommission) // _remote_commit(protobuf::IUCommission)
IPAACA_HEADER_EXPORT void _publish_iu(boost::shared_ptr<IU> iu); IPAACA_HEADER_EXPORT void _publish_iu(boost::shared_ptr<IU> iu);
/// mark and send IU retraction on own IU (removal from buffer is in remove(IU))
IPAACA_HEADER_EXPORT void _retract_iu(boost::shared_ptr<IU> iu); IPAACA_HEADER_EXPORT void _retract_iu(boost::shared_ptr<IU> iu);
/// mark and send retraction for all unretracted IUs (without removal, used in ~OutputBuffer)
IPAACA_HEADER_EXPORT void _retract_all_internal();
protected: protected:
/// \b Note: constructor is protected. Use create() /// \b Note: constructor is protected. Use create()
IPAACA_HEADER_EXPORT OutputBuffer(const std::string& basename, const std::string& channel=""); // empty string auto-replaced with __ipaaca_static_option_default_channel IPAACA_HEADER_EXPORT OutputBuffer(const std::string& basename, const std::string& channel=""); // empty string auto-replaced with __ipaaca_static_option_default_channel
IPAACA_HEADER_EXPORT void _initialize_server(); IPAACA_HEADER_EXPORT void _initialize_server();
public: public:
IPAACA_HEADER_EXPORT static boost::shared_ptr<OutputBuffer> create(const std::string& basename); IPAACA_HEADER_EXPORT static boost::shared_ptr<OutputBuffer> create(const std::string& basename);
IPAACA_HEADER_EXPORT ~OutputBuffer() { /// OutputBuffer destructor will retract all IUs that are still live
IPAACA_IMPLEMENT_ME IPAACA_HEADER_EXPORT ~OutputBuffer();
}
IPAACA_HEADER_EXPORT void add(boost::shared_ptr<IU> iu); IPAACA_HEADER_EXPORT void add(boost::shared_ptr<IU> iu);
IPAACA_HEADER_EXPORT boost::shared_ptr<IU> remove(const std::string& iu_uid); IPAACA_HEADER_EXPORT boost::shared_ptr<IU> remove(const std::string& iu_uid);
IPAACA_HEADER_EXPORT boost::shared_ptr<IU> remove(boost::shared_ptr<IU> iu); IPAACA_HEADER_EXPORT boost::shared_ptr<IU> remove(boost::shared_ptr<IU> iu);
......
...@@ -152,6 +152,15 @@ IPAACA_HEADER_EXPORT class IUCommittedError: public Exception//{{{ ...@@ -152,6 +152,15 @@ IPAACA_HEADER_EXPORT class IUCommittedError: public Exception//{{{
_description = "IUCommittedError"; _description = "IUCommittedError";
} }
};//}}} };//}}}
/// IU had already been retracted
IPAACA_HEADER_EXPORT class IURetractedError: public Exception//{{{
{
public:
IPAACA_HEADER_EXPORT inline ~IURetractedError() throw() { }
IPAACA_HEADER_EXPORT inline IURetractedError() { //boost::shared_ptr<IU> iu) {
_description = "IURetractedError";
}
};//}}}
/// Remote IU update failed because it had been modified in the mean time /// Remote IU update failed because it had been modified in the mean time
IPAACA_HEADER_EXPORT class IUUpdateFailedError: public Exception//{{{ IPAACA_HEADER_EXPORT class IUUpdateFailedError: public Exception//{{{
{ {
......
...@@ -95,6 +95,8 @@ IPAACA_HEADER_EXPORT class IUInterface {//{{{ ...@@ -95,6 +95,8 @@ IPAACA_HEADER_EXPORT class IUInterface {//{{{
IPAACA_HEADER_EXPORT void _add_and_remove_links(const LinkMap& add, const LinkMap& remove) { _links._add_and_remove_links(add, remove); } IPAACA_HEADER_EXPORT void _add_and_remove_links(const LinkMap& add, const LinkMap& remove) { _links._add_and_remove_links(add, remove); }
IPAACA_HEADER_EXPORT void _replace_links(const LinkMap& links) { _links._replace_links(links); } IPAACA_HEADER_EXPORT void _replace_links(const LinkMap& links) { _links._replace_links(links); }
public: public:
/// Return whether IU has been retracted
IPAACA_HEADER_EXPORT inline bool retracted() const { return _retracted; }
/// Return whether IU has already been published (is in a Buffer). /// Return whether IU has already been published (is in a Buffer).
IPAACA_HEADER_EXPORT inline bool is_published() { return (_buffer != 0); } IPAACA_HEADER_EXPORT inline bool is_published() { return (_buffer != 0); }
/// Return auto-generated UID string (set during IU construction) /// Return auto-generated UID string (set during IU construction)
......
...@@ -184,6 +184,12 @@ IPAACA_EXPORT boost::shared_ptr<int> CallbackIUPayloadUpdate::call(const std::st ...@@ -184,6 +184,12 @@ IPAACA_EXPORT boost::shared_ptr<int> CallbackIUPayloadUpdate::call(const std::st
IPAACA_INFO(" Referred-to revision was " << update->revision << " while local one is " << iu->_revision) IPAACA_INFO(" Referred-to revision was " << update->revision << " while local one is " << iu->_revision)
iu->_revision_lock.unlock(); iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(0)); return boost::shared_ptr<int>(new int(0));
} else if (iu->committed()) {
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(0));
} else if (iu->retracted()) {
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(0));
} }
if (update->is_delta) { if (update->is_delta) {
// FIXME FIXME this is an unsolved problem atm: deletes in a delta update are // FIXME FIXME this is an unsolved problem atm: deletes in a delta update are
...@@ -216,6 +222,12 @@ IPAACA_EXPORT boost::shared_ptr<int> CallbackIULinkUpdate::call(const std::strin ...@@ -216,6 +222,12 @@ IPAACA_EXPORT boost::shared_ptr<int> CallbackIULinkUpdate::call(const std::strin
IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid) IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid)
iu->_revision_lock.unlock(); iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(0)); return boost::shared_ptr<int>(new int(0));
} else if (iu->committed()) {
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(0));
} else if (iu->retracted()) {
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(0));
} }
if (update->is_delta) { if (update->is_delta) {
iu->modify_links(update->new_links, update->links_to_remove, update->writer_name); iu->modify_links(update->new_links, update->links_to_remove, update->writer_name);
...@@ -240,8 +252,11 @@ IPAACA_EXPORT boost::shared_ptr<int> CallbackIUCommission::call(const std::strin ...@@ -240,8 +252,11 @@ IPAACA_EXPORT boost::shared_ptr<int> CallbackIUCommission::call(const std::strin
IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid()) IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid())
iu->_revision_lock.unlock(); iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(0)); return boost::shared_ptr<int>(new int(0));
} } else if (iu->committed()) {
if (iu->committed()) { iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(0));
} else if (iu->retracted()) {
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(0)); return boost::shared_ptr<int>(new int(0));
} else { } else {
} }
...@@ -374,6 +389,8 @@ IPAACA_EXPORT void OutputBuffer::add(IU::ptr iu) ...@@ -374,6 +389,8 @@ IPAACA_EXPORT void OutputBuffer::add(IU::ptr iu)
} }
if (iu->is_published()) { if (iu->is_published()) {
throw IUPublishedError(); throw IUPublishedError();
} else if (iu->retracted()) {
throw IURetractedError();
} }
if (iu->access_mode() != IU_ACCESS_MESSAGE) { if (iu->access_mode() != IU_ACCESS_MESSAGE) {
// (for Message-type IUs: do not actually store them) // (for Message-type IUs: do not actually store them)
...@@ -433,6 +450,8 @@ IPAACA_EXPORT boost::shared_ptr<IU> OutputBuffer::remove(IU::ptr iu) ...@@ -433,6 +450,8 @@ IPAACA_EXPORT boost::shared_ptr<IU> OutputBuffer::remove(IU::ptr iu)
IPAACA_EXPORT void OutputBuffer::_retract_iu(IU::ptr iu) IPAACA_EXPORT void OutputBuffer::_retract_iu(IU::ptr iu)
{ {
if (iu->_retracted) return; // ignore subsequent retractions
iu->_retracted = true;
Informer<protobuf::IURetraction>::DataPtr data(new protobuf::IURetraction()); Informer<protobuf::IURetraction>::DataPtr data(new protobuf::IURetraction());
data->set_uid(iu->uid()); data->set_uid(iu->uid());
data->set_revision(iu->revision()); data->set_revision(iu->revision());
...@@ -440,6 +459,20 @@ IPAACA_EXPORT void OutputBuffer::_retract_iu(IU::ptr iu) ...@@ -440,6 +459,20 @@ IPAACA_EXPORT void OutputBuffer::_retract_iu(IU::ptr iu)
informer->publish(data); informer->publish(data);
} }
IPAACA_EXPORT void OutputBuffer::_retract_all_internal()
{
for (IUStore::iterator it=_iu_store.begin(); it!=_iu_store.end(); ++it) {
if (!(it->second->_retracted)) {
_retract_iu(it->second);
}
}
}
IPAACA_EXPORT OutputBuffer::~OutputBuffer()
{
_retract_all_internal();
}
//}}} //}}}
...@@ -740,10 +773,11 @@ IPAACA_EXPORT void InputBuffer::_handle_iu_events(EventPtr event) ...@@ -740,10 +773,11 @@ IPAACA_EXPORT void InputBuffer::_handle_iu_events(EventPtr event)
// //
it->second->_revision = update->revision(); it->second->_revision = update->revision();
it->second->_apply_retraction(); it->second->_apply_retraction();
auto final_iu_ref = it->second;
// remove from InputBuffer FIXME: this is a crossover between retracted and deleted behavior // remove from InputBuffer FIXME: this is a crossover between retracted and deleted behavior
_iu_store.erase(it->first); _iu_store.erase(it->first);
// and call the handler. IU reference is still valid for this call, although removed from buffer. // and call the handler. IU reference is still valid for this call, although removed from buffer.
call_iu_event_handlers(it->second, false, IU_COMMITTED, it->second->category() ); call_iu_event_handlers(final_iu_ref, false, IU_RETRACTED, it->second->category() );
// //
} else { } else {
IPAACA_WARNING("(Unhandled Event type " << type << " !)"); IPAACA_WARNING("(Unhandled Event type " << type << " !)");
......
...@@ -62,6 +62,7 @@ IPAACA_EXPORT IU::IU(const std::string& category, IUAccessMode access_mode, bool ...@@ -62,6 +62,7 @@ IPAACA_EXPORT IU::IU(const std::string& category, IUAccessMode access_mode, bool
_read_only = read_only; _read_only = read_only;
_access_mode = access_mode; _access_mode = access_mode;
_committed = false; _committed = false;
_retracted = false;
} }
IPAACA_EXPORT void IU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) IPAACA_EXPORT void IU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
...@@ -70,6 +71,9 @@ IPAACA_EXPORT void IU::_modify_links(bool is_delta, const LinkMap& new_links, co ...@@ -70,6 +71,9 @@ IPAACA_EXPORT void IU::_modify_links(bool is_delta, const LinkMap& new_links, co
if (_committed) { if (_committed) {
_revision_lock.unlock(); _revision_lock.unlock();
throw IUCommittedError(); throw IUCommittedError();
} else if (_retracted) {
_revision_lock.unlock();
throw IURetractedError();
} }
_increase_revision_number(); _increase_revision_number();
if (is_published()) { if (is_published()) {
...@@ -106,6 +110,9 @@ IPAACA_EXPORT void IU::_modify_payload(bool is_delta, const std::map<std::string ...@@ -106,6 +110,9 @@ IPAACA_EXPORT void IU::_modify_payload(bool is_delta, const std::map<std::string
if (_committed) { if (_committed) {
_revision_lock.unlock(); _revision_lock.unlock();
throw IUCommittedError(); throw IUCommittedError();
} else if (_retracted) {
_revision_lock.unlock();
throw IURetractedError();
} }
_increase_revision_number(); _increase_revision_number();
if (is_published()) { if (is_published()) {
...@@ -134,6 +141,9 @@ IPAACA_EXPORT void IU::_internal_commit(const std::string& writer_name) ...@@ -134,6 +141,9 @@ IPAACA_EXPORT void IU::_internal_commit(const std::string& writer_name)
if (_committed) { if (_committed) {
_revision_lock.unlock(); _revision_lock.unlock();
throw IUCommittedError(); throw IUCommittedError();
} else if (_retracted) {
_revision_lock.unlock();
throw IURetractedError();
} }
_increase_revision_number(); _increase_revision_number();
_committed = true; _committed = true;
...@@ -198,8 +208,9 @@ IPAACA_EXPORT void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new ...@@ -198,8 +208,9 @@ IPAACA_EXPORT void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new
{ {
if (_committed) { if (_committed) {
throw IUCommittedError(); throw IUCommittedError();
} } else if (_retracted) {
if (_read_only) { throw IURetractedError();
} else if (_read_only) {
throw IUReadOnlyError(); throw IUReadOnlyError();
} }
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name); RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
...@@ -222,8 +233,9 @@ IPAACA_EXPORT void RemotePushIU::_modify_payload(bool is_delta, const std::map<s ...@@ -222,8 +233,9 @@ IPAACA_EXPORT void RemotePushIU::_modify_payload(bool is_delta, const std::map<s
//std::cout << "-- Sending a modify_payload with " << new_items.size() << " keys to merge." << std::endl; //std::cout << "-- Sending a modify_payload with " << new_items.size() << " keys to merge." << std::endl;
if (_committed) { if (_committed) {
throw IUCommittedError(); throw IUCommittedError();
} } else if (_retracted) {
if (_read_only) { throw IURetractedError();
} else if (_read_only) {
throw IUReadOnlyError(); throw IUReadOnlyError();
} }
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name); RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
...@@ -247,6 +259,8 @@ IPAACA_EXPORT void RemotePushIU::commit() ...@@ -247,6 +259,8 @@ IPAACA_EXPORT void RemotePushIU::commit()
{ {
if (_read_only) { if (_read_only) {
throw IUReadOnlyError(); throw IUReadOnlyError();
} else if (_retracted) {
throw IURetractedError();
} }
if (_committed) { if (_committed) {
// Following python version: ignoring multiple commit // Following python version: ignoring multiple commit
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment