From a15be382bf0429aec3f93d0d6a249cb11246a047 Mon Sep 17 00:00:00 2001
From: Ramin Yaghoubzadeh <ryaghoub@techfak.uni-bielefeld.de>
Date: Fri, 13 Apr 2012 02:18:01 +0200
Subject: [PATCH] [C++] Remote write access is nearing completion

---
 cpp/src/ipaaca-test-main.cc |  11 +-
 cpp/src/ipaaca.cc           | 225 ++++++++++++++++++++++++++++++------
 cpp/src/ipaaca.h            |  70 +++++++++--
 3 files changed, 264 insertions(+), 42 deletions(-)

diff --git a/cpp/src/ipaaca-test-main.cc b/cpp/src/ipaaca-test-main.cc
index c6acfcf..d2c38bb 100644
--- a/cpp/src/ipaaca-test-main.cc
+++ b/cpp/src/ipaaca-test-main.cc
@@ -15,6 +15,10 @@ using namespace ipaaca;
 void my_first_iu_handler(IUInterface::ptr iu, IUEventType type, bool local)
 {
 	std::cout << "" << iu_event_type_to_str(type) << "" << std::endl;
+	if (type == IU_LINKSUPDATED) {
+		std::cout << "  setting something in the remote payload" << std::endl;
+		iu->payload()["new_field"] = "remotely_set";
+	}
 }
 int main() {
 	try{
@@ -41,7 +45,7 @@ int main() {
 		OutputBuffer::ptr ob = OutputBuffer::create("Tester");
 		std::cout << "Buffer: " << ob->unique_name() << std::endl;
 		
-		IU::ref iu = IU::create("testcategory");
+		IU::ptr iu = IU::create("testcategory");
 		ob->add(iu);
 		
 		std::cout << "_payload.get(\"TEST\") = \"" << iu->_payload.get("TEST") << "\"" << std::endl;
@@ -52,10 +56,13 @@ int main() {
 		std::string s = "The string \"" + iu->_payload["TEST"].to_str() + "\" is the new value.";
 		std::cout << "Concatenation test: " << s << std::endl;
 		
-		iu->add_link("grin", "DUMMY_IU_UID");
+		iu->add_link("grin", "DUMMY_IU_UID_1234efef1234");
 		
 		std::cout << "Interpreted as  long  value: " << iu->_payload["TEST"].to_int() << std::endl;
 		std::cout << "Interpreted as double value: " << iu->_payload["TEST"].to_float() << std::endl;
+		
+		std::cout << "Committing and quitting in 1 sec" << std::endl;
+		sleep(1);
 		iu->commit();
 	} catch (ipaaca::Exception& e) {
 		std::cout << "== IPAACA EXCEPTION == " << e.what() << std::endl;
diff --git a/cpp/src/ipaaca.cc b/cpp/src/ipaaca.cc
index ffc0ddf..71765bd 100644
--- a/cpp/src/ipaaca.cc
+++ b/cpp/src/ipaaca.cc
@@ -211,7 +211,7 @@ void Buffer::_allocate_unique_name(const std::string& basename, const std::strin
 	std::string uuid = ipaaca::generate_uuid_string();
 	_basename = basename;
 	_uuid = uuid.substr(0,8);
-	_unique_name = basename + "ID" + _uuid + "/" + function;
+	_unique_name = "/ipaaca/component/" + _basename + "ID" + _uuid + "/" + function;
 }
 void Buffer::register_handler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories)
 {
@@ -223,6 +223,100 @@ void Buffer::register_handler(IUEventHandlerFunction function, IUEventType event
 	IUEventHandler::ptr handler = IUEventHandler::ptr(new IUEventHandler(function, event_mask, category));
 	_event_handlers.push_back(handler);
 }
+void Buffer::call_iu_event_handlers(boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category)
+{
+	IPAACA_INFO("handling an event " << ipaaca::iu_event_type_to_str(event_type) << " for IU " << iu->uid())
+	//IUInterface::ptr iu = buffer->get(uid);
+	//if (iu) {
+		for (std::vector<IUEventHandler::ptr>::iterator it = _event_handlers.begin(); it != _event_handlers.end(); ++it) {
+			(*it)->call(this, iu, local, event_type, category);
+		}
+	//}
+}
+//}}}
+
+// Callbacks for OutputBuffer//{{{
+CallbackIUPayloadUpdate::CallbackIUPayloadUpdate(Buffer* buffer): _buffer(buffer) { }
+CallbackIULinkUpdate::CallbackIULinkUpdate(Buffer* buffer): _buffer(buffer) { }
+CallbackIUCommission::CallbackIUCommission(Buffer* buffer): _buffer(buffer) { }
+
+boost::shared_ptr<int> CallbackIUPayloadUpdate::call(const std::string& methodName, boost::shared_ptr<IUPayloadUpdate> update)
+{
+	IUInterface::ptr iui = _buffer->get(update->uid);
+	if (! iui) {
+		IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid)
+		return boost::shared_ptr<int>(new int(0));
+	}
+	IU::ptr iu = boost::static_pointer_cast<IU>(iui);
+	iu->_revision_lock.lock();
+	if ((update->revision != 0) && (update->revision != iu->_revision)) {
+		IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid)
+		iu->_revision_lock.unlock();
+		return boost::shared_ptr<int>(new int(0));
+	}
+	if (update->is_delta) {
+		for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) {
+			iu->payload()._internal_remove(*it, _buffer->unique_name());
+		}
+		for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
+			iu->payload()._internal_set(it->first, it->second, _buffer->unique_name());
+		}
+	} else {
+		iu->payload()._internal_replace_all(update->new_items, _buffer->unique_name());
+	}
+	_buffer->call_iu_event_handlers(iu, true, IU_UPDATED, iu->category());
+	revision_t revision = iu->revision();
+	iu->_revision_lock.unlock();
+	return boost::shared_ptr<int>(new int(revision));
+}
+
+boost::shared_ptr<int> CallbackIULinkUpdate::call(const std::string& methodName, boost::shared_ptr<IULinkUpdate> update)
+{
+	IPAACA_IMPLEMENT_ME
+	return boost::shared_ptr<int>(new int(0));
+	/*
+	'''Apply a remotely requested update to one of the stored IU's links.'''
+	if update.uid not in self._iu_store:
+		logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
+		return 0
+	iu = self._iu_store[update.uid]
+	with iu.revision_lock:
+		if not OMIT_REVISION_CHECKS and (update.revision != 0) and (update.revision != iu.revision):
+			# (0 means "do not pay attention to the revision number" -> "force update")
+			logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
+			return 0
+		if update.is_delta:
+			iu.modify_links(add=update.new_links, remove=update.links_to_remove, writer_name=update.writer_name)
+		else:
+			iu.set_links(links=update.new_links, writer_name=update.writer_name)
+		self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.LINKSUPDATED, category=iu.category)
+		return iu.revision
+	*/
+}
+boost::shared_ptr<int> CallbackIUCommission::call(const std::string& methodName, boost::shared_ptr<protobuf::IUCommission> update)
+{
+	IPAACA_IMPLEMENT_ME
+	return boost::shared_ptr<int>(new int(0));
+	/*
+	'''Apply a remotely requested commit to one of the stored IUs.'''
+	if iu_commission.uid not in self._iu_store:
+		logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid))
+		return 0
+	iu = self._iu_store[iu_commission.uid]
+	with iu.revision_lock:
+		if not OMIT_REVISION_CHECKS and (iu_commission.revision != 0) and (iu_commission.revision != iu.revision):
+			# (0 means "do not pay attention to the revision number" -> "force update")
+			logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
+			return 0
+		if iu.committed:
+			return 0
+		else:
+			iu._internal_commit(writer_name=iu_commission.writer_name)
+			self.call_iu_event_handlers(iu_commission.uid, local=True, event_type=IUEventType.COMMITTED, category=iu.category)
+			return iu.revision
+	*/
+}
+
 //}}}
 
 // OutputBuffer//{{{
@@ -232,6 +326,13 @@ OutputBuffer::OutputBuffer(const std::string& basename)
 {
 	_id_prefix = _basename + "-" + _uuid + "-IU-";
 }
+void OutputBuffer::_initialize_server()
+{
+	_server = Factory::getInstance().createServer( Scope( _unique_name ) );
+	_server->registerMethod("updatePayload", Server::CallbackPtr(new CallbackIUPayloadUpdate(this)));
+	_server->registerMethod("updateLinks", Server::CallbackPtr(new CallbackIULinkUpdate(this)));
+	_server->registerMethod("commit", Server::CallbackPtr(new CallbackIUCommission(this)));
+}
 OutputBuffer::ptr OutputBuffer::create(const std::string& basename)
 {
 	return OutputBuffer::ptr(new OutputBuffer(basename));
@@ -253,7 +354,8 @@ void OutputBuffer::_send_iu_link_update(IUInterface* iu, bool is_delta, revision
 	lup->is_delta = true;
 	lup->new_links = new_links;
 	if (is_delta) lup->links_to_remove = links_to_remove;
-	lup->writer_name = writer_name;
+	if (writer_name=="") lup->writer_name = _unique_name;
+	else lup->writer_name = writer_name;
 	Informer<AnyType>::Ptr informer = _get_informer(iu->category());
 	informer->publish(ldata);
 }
@@ -267,7 +369,8 @@ void OutputBuffer::_send_iu_payload_update(IUInterface* iu, bool is_delta, revis
 	pup->revision = revision;
 	pup->new_items = new_items;
 	if (is_delta) pup->keys_to_remove = keys_to_remove;
-	pup->writer_name = writer_name;
+	if (writer_name=="") pup->writer_name = _unique_name;
+	else pup->writer_name = writer_name;
 	Informer<AnyType>::Ptr informer = _get_informer(iu->category());
 	informer->publish(pdata);
 }
@@ -277,14 +380,15 @@ void OutputBuffer::_send_iu_commission(IUInterface* iu, revision_t revision, con
 	Informer<protobuf::IUCommission>::DataPtr data(new protobuf::IUCommission());
 	data->set_uid(iu->uid());
 	data->set_revision(revision);
-	data->set_writer_name(writer_name);
+	if (writer_name=="") data->set_writer_name(_unique_name);
+	else data->set_writer_name(writer_name);
+	
 	Informer<AnyType>::Ptr informer = _get_informer(iu->category());
 	informer->publish(data);
 }
 
-void OutputBuffer::add(IU::ref iu)
+void OutputBuffer::add(IU::ptr iu)
 {
-	//IPAACA_IMPLEMENT_ME
 	if (_iu_store.count(iu->uid()) > 0) {
 		throw IUPublishedError();
 	}
@@ -293,7 +397,7 @@ void OutputBuffer::add(IU::ref iu)
 	_publish_iu(iu);
 }
 
-void OutputBuffer::_publish_iu(IU::ref iu)
+void OutputBuffer::_publish_iu(IU::ptr iu)
 {
 	Informer<AnyType>::Ptr informer = _get_informer(iu->_category);
 	Informer<ipaaca::IU>::DataPtr iu_data(iu);
@@ -316,7 +420,7 @@ boost::shared_ptr<IU> OutputBuffer::remove(const std::string& iu_uid)
 {
 	IPAACA_IMPLEMENT_ME
 }
-boost::shared_ptr<IU> OutputBuffer::remove(IU::ref iu)
+boost::shared_ptr<IU> OutputBuffer::remove(IU::ptr iu)
 {
 	IPAACA_IMPLEMENT_ME
 }
@@ -443,10 +547,13 @@ IUInterface::ptr InputBuffer::get(const std::string& iu_uid)
 }
 
 
-RemoteServerPtr InputBuffer::_get_remote_server(boost::shared_ptr<IU> iu)
+RemoteServerPtr InputBuffer::_get_remote_server(const std::string& unique_server_name)
 {
-	IPAACA_IMPLEMENT_ME
-	return RemoteServerPtr();
+	std::map<std::string, RemoteServerPtr>::iterator it = _remote_server_store.find(unique_server_name);
+	if (it!=_remote_server_store.end()) return it->second;
+	RemoteServerPtr remote_server = Factory::getInstance().createRemoteServer(Scope(unique_server_name));
+	_remote_server_store[unique_server_name] = remote_server;
+	return remote_server;
 }
 
 ListenerPtr InputBuffer::_create_category_listener_if_needed(const std::string& category)
@@ -477,16 +584,6 @@ ListenerPtr InputBuffer::_create_category_listener_if_needed(const std::string&
 		return cat_listener
 	*/
 }
-void InputBuffer::call_iu_event_handlers(boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category)
-{
-	IPAACA_INFO("handling an event " << ipaaca::iu_event_type_to_str(event_type) << " for IU " << iu->uid())
-	//IUInterface::ptr iu = buffer->get(uid);
-	//if (iu) {
-		for (std::vector<IUEventHandler::ptr>::iterator it = _event_handlers.begin(); it != _event_handlers.end(); ++it) {
-			(*it)->call(this, iu, local, event_type, category);
-		}
-	//}
-}
 void InputBuffer::_handle_iu_events(EventPtr event)
 {
 	std::string type = event->getType();
@@ -681,9 +778,9 @@ void IUInterface::set_links(const LinkMap& links, const std::string& writer_name
 //}}}
 
 // IU//{{{
-IU::ref IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
+IU::ptr IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
 {
-	IU::ref iu = IU::ref(new IU(category, access_mode, read_only, payload_type)); /* params */ //));
+	IU::ptr iu = IU::ptr(new IU(category, access_mode, read_only, payload_type)); /* params */ //));
 	iu->_payload.initialize(iu);
 	return iu;
 }
@@ -750,9 +847,9 @@ void IU::_internal_commit(const std::string& writer_name)
 
 // RemotePushIU//{{{
 
-RemotePushIU::ref RemotePushIU::create()
+RemotePushIU::ptr RemotePushIU::create()
 {
-	RemotePushIU::ref iu = RemotePushIU::ref(new RemotePushIU(/* params */));
+	RemotePushIU::ptr iu = RemotePushIU::ptr(new RemotePushIU(/* params */));
 	iu->_payload.initialize(iu);
 	return iu;
 }
@@ -762,17 +859,73 @@ RemotePushIU::RemotePushIU()
 }
 void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
 {
-	IPAACA_IMPLEMENT_ME
+	if (_committed) {
+		throw IUCommittedError();
+	}
+	if (_read_only) {
+		throw IUReadOnlyError();
+	}
+	RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
+	IULinkUpdate::ptr update = IULinkUpdate::ptr(new IULinkUpdate());
+	update->uid = _uid;
+	update->revision = _revision;
+	update->is_delta = is_delta;
+	update->writer_name = _buffer->unique_name();
+	update->new_links = new_links;
+	update->links_to_remove = links_to_remove;
+	boost::shared_ptr<int> result = server->call<int>("updateLinks", update, 1); // TODO 1 sec
+	if (*result == 0) {
+		throw IUUpdateFailedError();
+	} else {
+		_revision = *result;
+	}
 }
 void RemotePushIU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
 {
-	IPAACA_IMPLEMENT_ME
+	if (_committed) {
+		throw IUCommittedError();
+	}
+	if (_read_only) {
+		throw IUReadOnlyError();
+	}
+	RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
+	IUPayloadUpdate::ptr update = IUPayloadUpdate::ptr(new IUPayloadUpdate());
+	update->uid = _uid;
+	update->revision = _revision;
+	update->is_delta = is_delta;
+	update->writer_name = _buffer->unique_name();
+	update->new_items = new_items;
+	update->keys_to_remove = keys_to_remove;
+	boost::shared_ptr<int> result = server->call<int>("updatePayload", update, 1); // TODO 1 sec
+	if (*result == 0) {
+		throw IUUpdateFailedError();
+	} else {
+		_revision = *result;
+	}
 }
 
 void RemotePushIU::commit()
 {
-	IPAACA_IMPLEMENT_ME
+	if (_read_only) {
+		throw IUReadOnlyError();
+	}
+	if (_committed) {
+		// Following python version: ignoring multiple commit
+		return;
+	}
+	RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
+	boost::shared_ptr<protobuf::IUCommission> update = boost::shared_ptr<protobuf::IUCommission>(new protobuf::IUCommission());
+	update->set_uid(_uid);
+	update->set_revision(_revision);
+	update->set_writer_name(_buffer->unique_name());
+	boost::shared_ptr<int> result = server->call<int>("commit", update, 1); // TODO 1 sec
+	if (*result == 0) {
+		throw IUUpdateFailedError();
+	} else {
+		_revision = *result;
+	}
 }
+
 void RemotePushIU::_apply_link_update(IULinkUpdate::ptr update)
 {
 	_revision = update->revision;
@@ -859,20 +1012,26 @@ PayloadEntryProxy Payload::operator[](const std::string& key)
 	return PayloadEntryProxy(this, key);
 }
 
-inline void Payload::set(const std::string& k, const std::string& v) {
+inline void Payload::_internal_set(const std::string& k, const std::string& v, const std::string& writer_name) {
 	std::map<std::string, std::string> _new;
 	std::vector<std::string> _remove;
 	_new[k]=v;
-	_iu->_modify_payload(true, _new, _remove, "" );
+	_iu->_modify_payload(true, _new, _remove, writer_name );
 	_store[k] = v;
 }
-inline void Payload::remove(const std::string& k) {
+inline void Payload::_internal_remove(const std::string& k, const std::string& writer_name) {
 	std::map<std::string, std::string> _new;
 	std::vector<std::string> _remove;
 	_remove.push_back(k);
-	_iu->_modify_payload(true, _new, _remove, "" );
+	_iu->_modify_payload(true, _new, _remove, writer_name );
 	_store.erase(k);
 }
+void Payload::_internal_replace_all(const std::map<std::string, std::string>& new_contents, const std::string& writer_name)
+{
+	std::vector<std::string> _remove;
+	_iu->_modify_payload(false, new_contents, _remove, writer_name );
+	_store = new_contents;
+}
 inline std::string Payload::get(const std::string& k) {
 	if (_store.count(k)>0) return _store[k];
 	else return IPAACA_PAYLOAD_DEFAULT_STRING_VALUE;
@@ -931,7 +1090,7 @@ AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std:
 		case IU_ACCESS_PUSH:
 			{
 			// Create a "remote push IU"
-			boost::shared_ptr<RemotePushIU> obj(new RemotePushIU());
+			boost::shared_ptr<RemotePushIU> obj = RemotePushIU::create();
 			// transfer pbo data to obj
 			obj->_uid = pbo->uid();
 			obj->_revision = pbo->revision();
diff --git a/cpp/src/ipaaca.h b/cpp/src/ipaaca.h
index e49385d..360cc5e 100644
--- a/cpp/src/ipaaca.h
+++ b/cpp/src/ipaaca.h
@@ -3,10 +3,12 @@
 
 #ifdef IPAACA_DEBUG_MESSAGES
 #define IPAACA_INFO(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- " << i << std::endl;
+#define IPAACA_WARNING(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- WARNING: " << i << std::endl;
 #define IPAACA_IMPLEMENT_ME std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- IMPLEMENT ME" << std::endl;
 #define IPAACA_TODO(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- TODO: " << i << std::endl;
 #else
 #define IPAACA_INFO(i) ;
+#define IPAACA_WARNING(i) ;
 #define IPAACA_IMPLEMENT_ME(i) ;
 #define IPAACA_TODO(i) ;
 #endif
@@ -186,6 +188,9 @@ class IUEventHandler {
 class Buffer { //: public boost::enable_shared_from_this<Buffer> {//{{{
 	friend class IU;
 	friend class RemotePushIU;
+	friend class CallbackIUPayloadUpdate;
+	friend class CallbackIULinkUpdate;
+	friend class CallbackIUCommission;
 	protected:
 		std::string _uuid;
 		std::string _basename;
@@ -200,6 +205,7 @@ class Buffer { //: public boost::enable_shared_from_this<Buffer> {//{{{
 		inline Buffer(const std::string& basename, const std::string& function) {
 			_allocate_unique_name(basename, function);
 		}
+		void call_iu_event_handlers(boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category);
 	public:
 		virtual inline ~Buffer() { }
 		inline const std::string& unique_name() { return _unique_name; }
@@ -210,13 +216,39 @@ class Buffer { //: public boost::enable_shared_from_this<Buffer> {//{{{
 };
 //}}}
 
+class CallbackIUPayloadUpdate: public Server::Callback<IUPayloadUpdate, int> {
+	protected:
+		Buffer* _buffer;
+	public:
+		CallbackIUPayloadUpdate(Buffer* buffer);
+		boost::shared_ptr<int> call(const std::string& methodName, boost::shared_ptr<IUPayloadUpdate> update);
+};
+class CallbackIULinkUpdate: public Server::Callback<IULinkUpdate, int> {
+	protected:
+		Buffer* _buffer;
+	public:
+		CallbackIULinkUpdate(Buffer* buffer);
+	public:
+		boost::shared_ptr<int> call(const std::string& methodName, boost::shared_ptr<IULinkUpdate> update);
+};
+class CallbackIUCommission: public Server::Callback<protobuf::IUCommission, int> {
+	protected:
+		Buffer* _buffer;
+	public:
+		CallbackIUCommission(Buffer* buffer);
+	public:
+		boost::shared_ptr<int> call(const std::string& methodName, boost::shared_ptr<protobuf::IUCommission> update);
+};
+
 class OutputBuffer: public Buffer { //, public boost::enable_shared_from_this<OutputBuffer>  {//{{{
 	friend class IU;
 	friend class RemotePushIU;
+	protected:
 	protected:
 		std::map<std::string, Informer<AnyType>::Ptr> _informer_store;
 		IUStore _iu_store;
 		Lock _iu_id_counter_lock;
+		ServerPtr _server;
 	protected:
 		// informing functions
 		void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef");
@@ -232,6 +264,7 @@ class OutputBuffer: public Buffer { //, public boost::enable_shared_from_this<Ou
 		Informer<AnyType>::Ptr _get_informer(const std::string& category);
 	protected:
 		OutputBuffer(const std::string& basename);
+		void _initialize_server();
 	public:
 		static boost::shared_ptr<OutputBuffer> create(const std::string& basename);
 		~OutputBuffer() {
@@ -250,6 +283,7 @@ class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<Inp
 	friend class RemotePushIU;
 	protected:
 		std::map<std::string, ListenerPtr> _listener_store;
+		std::map<std::string, RemoteServerPtr> _remote_server_store;
 		RemotePushIUStore _iu_store;  // TODO genericize
 	protected:
 		inline void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef")
@@ -265,10 +299,9 @@ class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<Inp
 			IPAACA_INFO("(ERROR) InputBuffer::_send_iu_commission() should never be invoked")
 		}
 	protected:
-		RemoteServerPtr _get_remote_server(boost::shared_ptr<IU> iu);
+		RemoteServerPtr _get_remote_server(const std::string& unique_server_name);
 		ListenerPtr _create_category_listener_if_needed(const std::string& category);
 		void _handle_iu_events(EventPtr event);
-		void call_iu_event_handlers(boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category);
 	protected:
 		InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests);
 		InputBuffer(const std::string& basename, const std::string& category_interest1);
@@ -361,6 +394,7 @@ class Payload//{{{
 	friend class IU;
 	friend class RemotePushIU;
 	friend class IUConverter;
+	friend class CallbackIUPayloadUpdate;
 	protected:
 		std::string _owner_name;
 		std::map<std::string, std::string> _store;
@@ -371,14 +405,17 @@ class Payload//{{{
 		void _remotely_enforced_wipe();
 		void _remotely_enforced_delitem(const std::string& k);
 		void _remotely_enforced_setitem(const std::string& k, const std::string& v);
+		void _internal_replace_all(const std::map<std::string, std::string>& new_contents, const std::string& writer_name="");
+		void _internal_set(const std::string& k, const std::string& v, const std::string& writer_name="");
+		void _internal_remove(const std::string& k, const std::string& writer_name="");
 	public:
 		inline const std::string& owner_name() { return _owner_name; }
 		// access
 		PayloadEntryProxy operator[](const std::string& key);
-		void set(const std::string& k, const std::string& v);
-		void remove(const std::string& k);
+		inline void set(const std::string& k, const std::string& v) { _internal_set(k, v); }
+		inline void remove(const std::string& k) { _internal_remove(k); }
 		std::string get(const std::string& k);
-	typedef boost::shared_ptr<Payload> ref;
+	typedef boost::shared_ptr<Payload> ptr;
 };//}}}
 
 class IUInterface {//{{{
@@ -449,6 +486,9 @@ class IU: public IUInterface {//{{{
 	friend class Buffer;
 	friend class InputBuffer;
 	friend class OutputBuffer;
+	friend class CallbackIUPayloadUpdate;
+	friend class CallbackIULinkUpdate;
+	friend class CallbackIUCommission;
 	public:
 		Payload _payload;
 	protected:
@@ -470,7 +510,7 @@ class IU: public IUInterface {//{{{
 	protected:
 		void _internal_commit(const std::string& writer_name = "");
 	public:
-	typedef boost::shared_ptr<IU> ref;
+	typedef boost::shared_ptr<IU> ptr;
 };//}}}
 
 class RemotePushIU: public IUInterface {//{{{
@@ -497,7 +537,7 @@ class RemotePushIU: public IUInterface {//{{{
 		void _apply_update(IUPayloadUpdate::ptr update);
 		void _apply_link_update(IULinkUpdate::ptr update);
 		void _apply_commission();
-	typedef boost::shared_ptr<RemotePushIU> ref;
+	typedef boost::shared_ptr<RemotePushIU> ptr;
 };//}}}
 
 class Exception: public std::exception//{{{
@@ -527,6 +567,22 @@ class IUCommittedError: public Exception//{{{
 			_description = "IUCommittedError";
 		}
 };//}}}
+class IUUpdateFailedError: public Exception//{{{
+{
+	public:
+		inline ~IUUpdateFailedError() throw() { }
+		inline IUUpdateFailedError() { //boost::shared_ptr<IU> iu) {
+			_description = "IUUpdateFailedError";
+		}
+};//}}}
+class IUReadOnlyError: public Exception//{{{
+{
+	public:
+		inline ~IUReadOnlyError() throw() { }
+		inline IUReadOnlyError() { //boost::shared_ptr<IU> iu) {
+			_description = "IUReadOnlyError";
+		}
+};//}}}
 class IUAlreadyInABufferError: public Exception//{{{
 {
 	public:
-- 
GitLab