diff --git a/ipaacalib/cpp/include/ipaaca/ipaaca-forwards.h b/ipaacalib/cpp/include/ipaaca/ipaaca-forwards.h index 98f3d4e96ddea6f4f79ae5235d0fdabc296abe1f..0d91d66fcb604b9bea353a8af46ba55b8eec49a3 100644 --- a/ipaacalib/cpp/include/ipaaca/ipaaca-forwards.h +++ b/ipaacalib/cpp/include/ipaaca/ipaaca-forwards.h @@ -55,6 +55,7 @@ class PayloadDocumentEntry; //class PayloadDocumentStore; +class PayloadBatchUpdateLock; class PayloadEntryProxy; class Payload; class PayloadIterator; diff --git a/ipaacalib/cpp/include/ipaaca/ipaaca-locking.h b/ipaacalib/cpp/include/ipaaca/ipaaca-locking.h index 5d74ac9d0ed2a8973ae0512be70efb6f7dd0b01b..c4519ed77176466eab03f7a03ebebcf4c573fbe0 100644 --- a/ipaacalib/cpp/include/ipaaca/ipaaca-locking.h +++ b/ipaacalib/cpp/include/ipaaca/ipaaca-locking.h @@ -65,10 +65,16 @@ IPAACA_HEADER_EXPORT class Lock } IPAACA_HEADER_EXPORT inline void lock() { _mutex.lock(); + on_lock(); } IPAACA_HEADER_EXPORT inline void unlock() { + on_unlock(); _mutex.unlock(); } + IPAACA_HEADER_EXPORT virtual inline void on_lock() { + } + IPAACA_HEADER_EXPORT virtual inline void on_unlock() { + } }; #else #include <pthread.h> @@ -89,10 +95,16 @@ IPAACA_HEADER_EXPORT class Lock } IPAACA_HEADER_EXPORT inline void lock() { pthread_mutex_lock(&_mutex); + on_lock(); } IPAACA_HEADER_EXPORT inline void unlock() { + on_unlock(); pthread_mutex_unlock(&_mutex); } + IPAACA_HEADER_EXPORT virtual inline void on_lock() { + } + IPAACA_HEADER_EXPORT virtual inline void on_unlock() { + } }; #endif diff --git a/ipaacalib/cpp/include/ipaaca/ipaaca-payload.h b/ipaacalib/cpp/include/ipaaca/ipaaca-payload.h index a741f372b7078957fbb9c1eb9c5d641f6850687f..dcf33e81d3d7ecebff94ff721e1e669b1ca0e17d 100644 --- a/ipaacalib/cpp/include/ipaaca/ipaaca-payload.h +++ b/ipaacalib/cpp/include/ipaaca/ipaaca-payload.h @@ -143,11 +143,28 @@ IPAACA_HEADER_EXPORT class PayloadDocumentEntry//{{{ typedef std::map<std::string, PayloadDocumentEntry::ptr> PayloadDocumentStore; + +#if 0 +/** \brief Lock to accumulate payload changes into one single update transaction + * + */ +IPAACA_HEADER_EXPORT class PayloadBatchUpdateLock: public ipaaca::Lock +{ + friend class Payload; + protected: + Payload* _payload; + public: + IPAACA_HEADER_EXPORT inline PayloadBatchUpdateLock(): Lock() { } + IPAACA_HEADER_EXPORT void on_lock() override; + IPAACA_HEADER_EXPORT void on_unlock() override; +}; +#endif + /** \brief Central class containing the user-set payload of any IUInterface class (IU, Message, RemotePushIU or RemoteMessage) * * Obtained by calling payload() on any IUInterface derived object. Created during IU creation. */ -IPAACA_HEADER_EXPORT class Payload//{{{ +IPAACA_HEADER_EXPORT class Payload: public Lock //{{{ { friend std::ostream& operator<<(std::ostream& os, const Payload& obj); friend class IUInterface; @@ -161,13 +178,28 @@ IPAACA_HEADER_EXPORT class Payload//{{{ friend class PayloadEntryProxy; friend class PayloadIterator; friend class FakeIU; + //friend class PayloadBatchUpdateLock; protected: IPAACA_MEMBER_VAR_EXPORT std::string _owner_name; //IPAACA_MEMBER_VAR_EXPORT rapidjson::Document _json_document; //IPAACA_MEMBER_VAR_EXPORT std::map<std::string, rapidjson::Document> _json_store; IPAACA_MEMBER_VAR_EXPORT PayloadDocumentStore _document_store; IPAACA_MEMBER_VAR_EXPORT boost::weak_ptr<IUInterface> _iu; + //IPAACA_MEMBER_VAR_EXPORT PayloadBatchUpdateLock _batch_update_lock; + // + IPAACA_MEMBER_VAR_EXPORT Lock _payload_operation_mode_lock; //< enforcing atomicity wrt the bool flag below + // + IPAACA_MEMBER_VAR_EXPORT bool _update_on_every_change; //< true: batch update not active; false: collecting updates (payload locked) + IPAACA_MEMBER_VAR_EXPORT std::map<std::string, PayloadDocumentEntry::ptr> _collected_modifications; + IPAACA_MEMBER_VAR_EXPORT std::vector<std::string> _collected_removals; + IPAACA_MEMBER_VAR_EXPORT std::string _batch_update_writer_name; + protected: + /// inherited from ipaaca::Lock, starting batch update collection mode + IPAACA_HEADER_EXPORT void on_lock() override; + /// inherited from ipaaca::Lock, finishing batch update collection mode + IPAACA_HEADER_EXPORT void on_unlock() override; protected: + //IPAACA_HEADER_EXPORT ipaaca::Locker&& batch_update() { return std::move(ipaaca::Locker(*this); } IPAACA_HEADER_EXPORT void initialize(boost::shared_ptr<IUInterface> iu); IPAACA_HEADER_EXPORT inline void _set_owner_name(const std::string& name) { _owner_name = name; } IPAACA_HEADER_EXPORT void _remotely_enforced_wipe(); @@ -178,7 +210,9 @@ IPAACA_HEADER_EXPORT class Payload//{{{ IPAACA_HEADER_EXPORT void _internal_merge(const std::map<std::string, PayloadDocumentEntry::ptr>& contents_to_merge, const std::string& writer_name=""); IPAACA_HEADER_EXPORT void _internal_set(const std::string& k, PayloadDocumentEntry::ptr v, const std::string& writer_name=""); IPAACA_HEADER_EXPORT void _internal_remove(const std::string& k, const std::string& writer_name=""); + IPAACA_HEADER_EXPORT void _internal_merge_and_remove(const std::map<std::string, PayloadDocumentEntry::ptr>& contents_to_merge, const std::vector<std::string>& keys_to_remove, const std::string& writer_name=""); public: + IPAACA_HEADER_EXPORT inline Payload(): _batch_update_writer_name(""), _update_on_every_change(true) { } IPAACA_HEADER_EXPORT inline const std::string& owner_name() { return _owner_name; } // access /// Obtain a payload item by name as a PayloadEntryProxy (returning null-type proxy if undefined) diff --git a/ipaacalib/cpp/src/ipaaca-buffers.cc b/ipaacalib/cpp/src/ipaaca-buffers.cc index 1c564b84f9fd21bd1ed0f1f0c6f449aae28a9c43..38520ad5acb06b4e488c9d4605866a12f01871fc 100644 --- a/ipaacalib/cpp/src/ipaaca-buffers.cc +++ b/ipaacalib/cpp/src/ipaaca-buffers.cc @@ -338,6 +338,7 @@ IPAACA_EXPORT void OutputBuffer::_send_iu_payload_update(IUInterface* iu, bool i { IUPayloadUpdate* pup = new ipaaca::IUPayloadUpdate(); Informer<ipaaca::IUPayloadUpdate>::DataPtr pdata(pup); + pup->payload_type = iu->payload_type(); pup->uid = iu->uid(); pup->is_delta = is_delta; pup->revision = revision; diff --git a/ipaacalib/cpp/src/ipaaca-internal.cc b/ipaacalib/cpp/src/ipaaca-internal.cc index b4e0c0f0e15174fddd2d85f2b481a1442544fc48..1ce8f5f467648919b4728b2ce2e694447c5ba54c 100644 --- a/ipaacalib/cpp/src/ipaaca-internal.cc +++ b/ipaacalib/cpp/src/ipaaca-internal.cc @@ -481,10 +481,15 @@ IPAACA_EXPORT std::string IUPayloadUpdateConverter::serialize(const AnnotatedDat // legacy mode item->set_value( json_value_cast<std::string>(kv.second->document)); item->set_type("STR"); + } else { + IPAACA_ERROR("Uninitialized payload update type!") + throw NotImplementedError(); } + IPAACA_DEBUG("Adding updated item (type " << item->type() << "): " << item->key() << " -> " << item->value() ) } for (auto& key: obj->keys_to_remove) { pbo->add_keys_to_remove(key); + IPAACA_DEBUG("Adding removed key: " << key) } pbo->SerializeToString(&wire); return getWireSchema(); diff --git a/ipaacalib/cpp/src/ipaaca-ius.cc b/ipaacalib/cpp/src/ipaaca-ius.cc index fa1efb4e92365dfb5c7e0910987a0e38a307d6eb..07caeb65b2ed42aa132b265f75113c3e8bb6a87a 100644 --- a/ipaacalib/cpp/src/ipaaca-ius.cc +++ b/ipaacalib/cpp/src/ipaaca-ius.cc @@ -109,8 +109,16 @@ IPAACA_EXPORT void IU::_modify_payload(bool is_delta, const std::map<std::string } _increase_revision_number(); if (is_published()) { - //std::cout << "Sending a payload update with " << new_items.size() << " entries to merge." << std::endl; + IPAACA_DEBUG("Sending a payload update, new entries:") + for (auto& kv: new_items) { + IPAACA_DEBUG(" " << kv.first << " -> " << kv.second) + } + IPAACA_DEBUG("and with removed keys:") + for (auto& k: keys_to_remove) { + IPAACA_DEBUG(" " << k) + } _buffer->_send_iu_payload_update(this, is_delta, _revision, new_items, keys_to_remove, writer_name); + IPAACA_DEBUG("... sent.") } _revision_lock.unlock(); } diff --git a/ipaacalib/cpp/src/ipaaca-json.cc b/ipaacalib/cpp/src/ipaaca-json.cc index fb0aa135185833983b9abf751f92beb434abbe10..dfdd83ee63adc0603c0f9340cd017d33bd14f9e7 100644 --- a/ipaacalib/cpp/src/ipaaca-json.cc +++ b/ipaacalib/cpp/src/ipaaca-json.cc @@ -39,6 +39,74 @@ using namespace rapidjson; using namespace std; +int batch_update_main(int argc, char** argv)//{{{ +{ + std::string json_source("[\n\ + \"old\",\n\ + [\n\ + \"str\",\n\ + null\n\ + ],\n\ + 3,\n\ + {\n\ + \"key1\": \"value1\",\n\ + \"key2\": \"value2\"\n\ + }\n\ +]"); + + ipaaca::OutputBuffer::ptr ob = ipaaca::OutputBuffer::create("myprog"); + std::cout << std::endl << "Setting up an IU with initial contents" << std::endl; + ipaaca::IU::ptr iu = ipaaca::IU::create("testcategory"); + iu->payload()["a"] = "OLD: initial contents of payload"; + iu->payload()["b"] = "OLD: initial value for b"; + std::cout << std::endl << "Initial contents of payload:" << std::endl; + for (auto it: iu->payload()) { + std::cout << " " << it.first << " -> " << it.second << std::endl; + } + + std::cout << std::endl << "Publishing IU (sniffer should receive one ADDED)" << std::endl; + ob->add(iu); + + std::cout << std::endl << "Batch-writing some stuff (sniffer should receive a single UPDATED)" << std::endl; + { + ipaaca::Locker locker(iu->payload()); + iu->payload().set(std::map<std::string, std::string>{{"b", "VALUE"}, {"bPrime", "VALUE"}}); + iu->payload()["a"] = std::map<std::string, long>{{"a", 1},{"b", 2},{"c", 3}}; + iu->payload()["remove_me"] = "WARNING: this should not be in the payload where an update is received!"; + iu->payload()["c"] = "WARNING: this should read abc123, not this warning message!"; + iu->payload()["d"] = 100; + iu->payload().remove("d"); + iu->payload()["d"] = 200; + iu->payload()["d"] = 300; + iu->payload().remove("d"); + iu->payload()["d"] = 400; + iu->payload()["e"] = "Note: a key 'd' should exist with value 400, and 'b' and 'bPrime' should be equal"; + iu->payload()["f"] = "12.5000"; + iu->payload()["g"] = std::vector<std::string>{"g1", "g2"}; + iu->payload().remove("remove_me"); + iu->payload()["c"] = "abc123"; + } + + std::cout << std::endl << "Adding another key 'XYZ' outside batch mode (sniffer -> UPDATED)" << std::endl; + iu->payload()["XYZ"] = "blabla"; + + std::cout << std::endl << "Final batch update, wiping most (sniffer should receive a third UPDATED, with 3 keys remaining in the payload)" << std::endl; + { + ipaaca::Locker locker(iu->payload()); + iu->payload()["SHOULD_NOT_EXIST"] = "WARNING: this key should never be visible"; + iu->payload().set(std::map<std::string, std::string>{{"A", "Final contents (3 entries)"}, {"B", "Final stuff (3 entries)"}}); + iu->payload()["C"] = std::vector<std::string>{"payload ", "should ", "have ", "three ", "entries, ", "A ", "B ", "and ", "C"}; + } + + std::cout << std::endl << "Final contents of payload:" << std::endl; + for (auto it: iu->payload()) { + std::cout << " " << it.first << " -> " << it.second << std::endl; + } + + return 0; +} +//}}} + int iterators_main(int argc, char** argv)//{{{ { std::string json_source("[\n\ @@ -423,7 +491,8 @@ int main(int argc, char** argv) ipaaca::CommandLineParser::ptr parser = ipaaca::CommandLineParser::create(); ipaaca::CommandLineOptions::ptr options = parser->parse(argc, argv); - return iterators_main(argc, argv); + return batch_update_main(argc, argv); + //return iterators_main(argc, argv); //return json_testbed_main(argc, argv); //return legacy_iu_main(argc, argv); //return fakeiu_main(argc, argv); diff --git a/ipaacalib/cpp/src/ipaaca-payload.cc b/ipaacalib/cpp/src/ipaaca-payload.cc index 02da62124b26456288b4ce75cbd4f9a41adc50a5..4047162a7630dc363318f2b4d62fc008e860fdc3 100644 --- a/ipaacalib/cpp/src/ipaaca-payload.cc +++ b/ipaacalib/cpp/src/ipaaca-payload.cc @@ -671,6 +671,24 @@ IPAACA_EXPORT template<> std::map<std::string, std::string> PayloadEntryProxy::g // Payload//{{{ +IPAACA_EXPORT void Payload::on_lock() +{ + Locker locker(_payload_operation_mode_lock); + IPAACA_DEBUG("Starting batch update mode ...") + _update_on_every_change = false; +} +IPAACA_EXPORT void Payload::on_unlock() +{ + Locker locker(_payload_operation_mode_lock); + IPAACA_DEBUG("... applying batch update with " << _collected_modifications.size() << " modifications and " << _collected_removals.size() << " removals ...") + _internal_merge_and_remove(_collected_modifications, _collected_removals, _batch_update_writer_name); + _update_on_every_change = true; + _batch_update_writer_name = ""; + _collected_modifications.clear(); + _collected_removals.clear(); + IPAACA_DEBUG("... exiting batch update mode.") +} + IPAACA_EXPORT void Payload::initialize(boost::shared_ptr<IUInterface> iu) { _iu = boost::weak_ptr<IUInterface>(iu); @@ -693,41 +711,105 @@ IPAACA_EXPORT Payload::operator std::map<std::string, std::string>() } IPAACA_EXPORT void Payload::_internal_set(const std::string& k, PayloadDocumentEntry::ptr v, const std::string& writer_name) { - std::map<std::string, PayloadDocumentEntry::ptr> _new; - std::vector<std::string> _remove; - _new[k] = v; - _iu.lock()->_modify_payload(true, _new, _remove, writer_name ); - IPAACA_DEBUG(" Setting local payload item \"" << k << "\" to " << v) - _document_store[k] = v; - mark_revision_change(); + Locker locker(_payload_operation_mode_lock); + if (_update_on_every_change) { + std::map<std::string, PayloadDocumentEntry::ptr> _new; + std::vector<std::string> _remove; + _new[k] = v; + _iu.lock()->_modify_payload(true, _new, _remove, writer_name ); + IPAACA_DEBUG(" Setting local payload item \"" << k << "\" to " << v) + _document_store[k] = v; + mark_revision_change(); + } else { + IPAACA_DEBUG("queueing a payload set operation") + _batch_update_writer_name = writer_name; + _collected_modifications[k] = v; + // revoke deletions of this updated key + std::vector<std::string> new_removals; + for (auto& rk: _collected_removals) { + if (rk!=k) new_removals.push_back(rk); + } + _collected_removals = new_removals; + } } IPAACA_EXPORT void Payload::_internal_remove(const std::string& k, const std::string& writer_name) { - std::map<std::string, PayloadDocumentEntry::ptr> _new; - std::vector<std::string> _remove; - _remove.push_back(k); - _iu.lock()->_modify_payload(true, _new, _remove, writer_name ); - _document_store.erase(k); - mark_revision_change(); + Locker locker(_payload_operation_mode_lock); + if (_update_on_every_change) { + std::map<std::string, PayloadDocumentEntry::ptr> _new; + std::vector<std::string> _remove; + _remove.push_back(k); + _iu.lock()->_modify_payload(true, _new, _remove, writer_name ); + _document_store.erase(k); + mark_revision_change(); + } else { + IPAACA_DEBUG("queueing a payload remove operation") + _batch_update_writer_name = writer_name; + _collected_removals.push_back(k); + // revoke updates of this deleted key + _collected_modifications.erase(k); + } } IPAACA_EXPORT void Payload::_internal_replace_all(const std::map<std::string, PayloadDocumentEntry::ptr>& new_contents, const std::string& writer_name) { - std::vector<std::string> _remove; - _iu.lock()->_modify_payload(false, new_contents, _remove, writer_name ); - _document_store = new_contents; - mark_revision_change(); + Locker locker(_payload_operation_mode_lock); + if (_update_on_every_change) { + std::vector<std::string> _remove; + _iu.lock()->_modify_payload(false, new_contents, _remove, writer_name ); + _document_store = new_contents; + mark_revision_change(); + } else { + IPAACA_DEBUG("queueing a payload replace_all operation") + _batch_update_writer_name = writer_name; + _collected_modifications.clear(); + for (auto& kv: new_contents) { + _collected_modifications[kv.first] = kv.second; + } + // take all existing keys and flag to remove them, unless overridden in current update + for (auto& kv: _document_store) { + if (! new_contents.count(kv.first)) { + _collected_removals.push_back(kv.first); + _collected_modifications.erase(kv.first); + } + } + } } IPAACA_EXPORT void Payload::_internal_merge(const std::map<std::string, PayloadDocumentEntry::ptr>& contents_to_merge, const std::string& writer_name) { - std::vector<std::string> _remove; - _iu.lock()->_modify_payload(true, contents_to_merge, _remove, writer_name ); + Locker locker(_payload_operation_mode_lock); + if (_update_on_every_change) { + std::vector<std::string> _remove; + _iu.lock()->_modify_payload(true, contents_to_merge, _remove, writer_name ); + for (auto& kv: contents_to_merge) { + _document_store[kv.first] = kv.second; + } + mark_revision_change(); + } else { + IPAACA_DEBUG("queueing a payload merge operation") + std::set<std::string> updated_keys; + _batch_update_writer_name = writer_name; + for (auto& kv: contents_to_merge) { + _collected_modifications[kv.first] = kv.second; + updated_keys.insert(kv.first); + } + // revoke deletions of updated keys + std::vector<std::string> new_removals; + for (auto& rk: _collected_removals) { + if (! updated_keys.count(rk)) new_removals.push_back(rk); + } + _collected_removals = new_removals; + } +} +IPAACA_EXPORT void Payload::_internal_merge_and_remove(const std::map<std::string, PayloadDocumentEntry::ptr>& contents_to_merge, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) +{ + // this function is called by exiting the batch update mode only, so no extra locking here + _iu.lock()->_modify_payload(true, contents_to_merge, keys_to_remove, writer_name ); + for (auto& k: keys_to_remove) { + _document_store.erase(k); + } for (auto& kv: contents_to_merge) { _document_store[kv.first] = kv.second; } mark_revision_change(); - //_document_store.insert(contents_to_merge.begin(), contents_to_merge.end()); - //for (std::map<std::string, std::string>::iterator it = contents_to_merge.begin(); it!=contents_to_merge.end(); i++) { - // _store[it->first] = it->second; - //} } IPAACA_EXPORT PayloadDocumentEntry::ptr Payload::get_entry(const std::string& k) { if (_document_store.count(k)>0) return _document_store[k];