From c22247bd4a4a1c47325942885c4a617bc7360355 Mon Sep 17 00:00:00 2001
From: Ramin Yaghoubzadeh <ryaghoub@techfak.uni-bielefeld.de>
Date: Tue, 10 Apr 2012 17:00:04 +0200
Subject: [PATCH] More work on C++ version

---
 cpp/src/.gitignore          |   3 +
 cpp/src/Makefile            |  18 +-
 cpp/src/ipaaca-test-main.cc |  30 ++--
 cpp/src/ipaaca.cc           | 324 ++++++++++++++++++++++++++++++++----
 cpp/src/ipaaca.h            | 291 +++++++++++++++++++++++++++-----
 5 files changed, 578 insertions(+), 88 deletions(-)
 create mode 100644 cpp/src/.gitignore

diff --git a/cpp/src/.gitignore b/cpp/src/.gitignore
new file mode 100644
index 0000000..05c3e63
--- /dev/null
+++ b/cpp/src/.gitignore
@@ -0,0 +1,3 @@
+ipaaca.pb.cc
+ipaaca.pb.h
+
diff --git a/cpp/src/Makefile b/cpp/src/Makefile
index cb0a88d..ea51976 100644
--- a/cpp/src/Makefile
+++ b/cpp/src/Makefile
@@ -4,10 +4,20 @@ CCFLAGS=-I. -I/usr/local/include -I/opt/local/include ${CONFIG}
 BOOSTLIBS = -L/opt/local/lib -lboost_regex-mt -lboost_date_time-mt -lboost_program_options-mt -lboost_thread-mt -lboost_filesystem-mt -lboost_signals-mt -lboost_system-mt
 PROTOLIBS = -L/opt/local/lib -lprotobuf
 LIBS = ${BOOSTLIBS} ${PROTOLIBS} -L/usr/local/lib -lrsc -lrsbcore
-all: protoc
-	#g++ ${CCFLAGS} -o ipaaca-test ${SOURCES} ${LIBS}
-	g++ ${CCFLAGS} -DMAKE_RECEIVER -o ipaaca-receiver ${SOURCES} ${LIBS}
-	g++ ${CCFLAGS} -DMAKE_SENDER -o ipaaca-sender ${SOURCES} ${LIBS}
+
+COMPILER = gfilt
+
+all: main
+	true
+
+receiver:
+	${COMPILER} ${CCFLAGS} -DMAKE_RECEIVER -o ipaaca-receiver ${SOURCES} ${LIBS}
+
+sender:
+	${COMPILER} ${CCFLAGS} -DMAKE_SENDER -o ipaaca-sender ${SOURCES} ${LIBS}
+
+main:
+	${COMPILER} ${CCFLAGS} -o ipaaca-main ${SOURCES} ${LIBS}
 
 protoc:
 	protoc --proto_path=../../proto ../../proto/ipaaca.proto --cpp_out=.
diff --git a/cpp/src/ipaaca-test-main.cc b/cpp/src/ipaaca-test-main.cc
index 41ddd23..dc2dede 100644
--- a/cpp/src/ipaaca-test-main.cc
+++ b/cpp/src/ipaaca-test-main.cc
@@ -75,7 +75,6 @@ int main() {
 	lup->links_to_remove["grin"].insert("1001");
 	informer->publish(ldata);
 	
-	
 	std::cout << "Done." << std::endl;
 	return EXIT_SUCCESS;
 }
@@ -88,22 +87,29 @@ int main() {
 using namespace ipaaca;
 
 int main() {
-	//try{
+	try{
 		initialize_ipaaca_rsb();
+		
+		
+		OutputBuffer ob;
+		
 		IU::ref iu = IU::create();
-		std::cout << "payload.get(\"TEST\") = \"" << iu->payload.get("TEST") << "\"" << std::endl;
-		std::cout << "payload[\"TEST\"] = \"" << (std::string) iu->payload["TEST"] << "\"" << std::endl;
-		iu->payload["TEST"] = "123.5-WAS-SET";
-		std::cout << "payload[\"TEST\"] = \"" << (std::string) iu->payload["TEST"] << "\"" << std::endl;
+		ob.add(iu);
+		
+		std::cout << "_payload.get(\"TEST\") = \"" << iu->_payload.get("TEST") << "\"" << std::endl;
+		std::cout << "_payload[\"TEST\"] = \"" << (std::string) iu->_payload["TEST"] << "\"" << std::endl;
+		iu->_payload["TEST"] = "123.5-WAS-SET";
+		std::cout << "_payload[\"TEST\"] = \"" << (std::string) iu->_payload["TEST"] << "\"" << std::endl;
 		
-		std::string s = "The string \"" + iu->payload["TEST"].to_str() + "\" is the new value.";
+		std::string s = "The string \"" + iu->_payload["TEST"].to_str() + "\" is the new value.";
 		std::cout << "Concatenation test: " << s << std::endl;
 	
-		std::cout << "Interpreted as  long  value: " << iu->payload["TEST"].to_int() << std::endl;
-		std::cout << "Interpreted as double value: " << iu->payload["TEST"].to_float() << std::endl;
-	//} catch (std::exception& e) {
-	//	std::cout << e.what() << std::endl;
-	//}
+		std::cout << "Interpreted as  long  value: " << iu->_payload["TEST"].to_int() << std::endl;
+		std::cout << "Interpreted as double value: " << iu->_payload["TEST"].to_float() << std::endl;
+		iu->commit();
+	} catch (ipaaca::Exception& e) {
+		std::cout << "== IPAACA EXCEPTION == " << e.what() << std::endl;
+	}
 }
 
 #endif
diff --git a/cpp/src/ipaaca.cc b/cpp/src/ipaaca.cc
index 243e0fa..f4525af 100644
--- a/cpp/src/ipaaca.cc
+++ b/cpp/src/ipaaca.cc
@@ -2,9 +2,20 @@
 #include <cstdlib>
 
 namespace ipaaca {
-/*
-*/
 
+// util and init//{{{
+std::string generate_uuid_string()
+{
+	uuid_t uuidt;
+	uuid_string_t uuidstr;
+	uuid_generate(uuidt);
+	uuid_unparse_lower(uuidt, uuidstr);
+	return uuidstr;
+}
+
+//const LinkSet EMPTY_LINK_SET = LinkSet();
+//const std::set<std::string> EMPTY_LINK_SET();
+       
 void initialize_ipaaca_rsb()
 {
 	ParticipantConfig config = ParticipantConfig::fromConfiguration();
@@ -17,8 +28,19 @@ void initialize_ipaaca_rsb()
 	
 	//IPAACA_TODO("initialize all converters")
 }
+/*
+void init_inprocess_too() {
+	//ParticipantConfig config = Factory::getInstance().getDefaultParticipantConfig();
+	ParticipantConfig config = ParticipantConfig::fromFile("rsb.cfg");
+	//ParticipantConfig::Transport inprocess = config.getTransport("inprocess");
+	//inprocess.setEnabled(true);
+	//config.addTransport(inprocess);
+	Factory::getInstance().setDefaultParticipantConfig(config);
+}
+*/
+//}}}
 
-std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj)
+std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj)//{{{
 {
 	os << "PayloadUpdate(uid=" << obj.uid << ", revision=" << obj.revision;
 	os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False");
@@ -37,8 +59,8 @@ std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj)
 	os << "])";
 	return os;
 }
-
-std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj)
+//}}}
+std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj)//{{{
 {
 	os << "LinkUpdate(uid=" << obj.uid << ", revision=" << obj.revision;
 	os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False");
@@ -69,26 +91,271 @@ std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj)
 	os << "})";
 	return os;
 }
+//}}}
 
+// SmartLinkMap//{{{
+void SmartLinkMap::_add_and_remove_links(const LinkMap& add, const LinkMap& remove)
+{
+	// remove specified links
+	for (LinkMap::const_iterator it = remove.begin(); it != remove.end(); ++it ) {
+		// if link type exists
+		if (_links.count(it->first) > 0) {
+			// remove one by one
+			for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
+				_links[it->first].erase(*it2);
+			}
+			// wipe the type key if no more links are left
+			if (_links[it->first].size() == 0) {
+				_links.erase(it->first);
+			}
+		}
+	}
+	// add specified links
+	for (LinkMap::const_iterator it = add.begin(); it != add.end(); ++it ) {
+		for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
+			_links[it->first].insert(*it2);
+		}
+	}
+}
+void SmartLinkMap::_replace_links(const LinkMap& links)
+{
+	//_links.clear();
+	_links=links;
+}
+//}}}
 
-/*
-void init_inprocess_too() {
-	//ParticipantConfig config = Factory::getInstance().getDefaultParticipantConfig();
-	ParticipantConfig config = ParticipantConfig::fromFile("rsb.cfg");
-	//ParticipantConfig::Transport inprocess = config.getTransport("inprocess");
-	//inprocess.setEnabled(true);
-	//config.addTransport(inprocess);
-	Factory::getInstance().setDefaultParticipantConfig(config);
+
+
+// OutputBuffer//{{{
+void OutputBuffer::_send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
+{
+	IPAACA_IMPLEMENT_ME
+}
+void OutputBuffer::_send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
+{
+	IPAACA_IMPLEMENT_ME
 }
+void OutputBuffer::_send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name)
+{
+	IPAACA_IMPLEMENT_ME
+}
+void OutputBuffer::add(IU::ref iu)
+{
+	IPAACA_IMPLEMENT_ME
+	// TODO place in iu store 
+	iu->_set_buffer(this); //shared_from_this());
+	// TODO
+}
+
+/*
+	def _send_iu_link_update(self, iu, is_delta, revision, new_links=None, links_to_remove=None, writer_name="undef"):
+		'''Send an IU link update.
+		
+		Keyword arguments:
+		iu -- the IU being updated
+		is_delta -- whether this is an incremental update or a replacement
+			the whole link dictionary
+		revision -- the new revision number
+		new_links -- a dictionary of new link sets
+		links_to_remove -- a dict of the link sets that shall be removed
+		writer_name -- name of the Buffer that initiated this update, necessary
+			to enable remote components to filter out updates that originate d
+			from their own operations
+		'''
+		if new_links is None:
+			new_links = {}
+		if links_to_remove is None:
+			links_to_remove = {}
+		link_update = IULinkUpdate(iu._uid, is_delta=is_delta, revision=revision)
+		link_update.new_links = new_links
+		if is_delta:
+			link_update.links_to_remove = links_to_remove
+		link_update.writer_name = writer_name
+		informer = self._get_informer(iu._category)
+		informer.publishData(link_update)
+		# FIXME send the notification to the target, if the target is not the writer_name
+*/
+/*
+	def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"):
+		'''Send an IU payload update.
+		
+		Keyword arguments:
+		iu -- the IU being updated
+		is_delta -- whether this is an incremental update or a replacement
+		revision -- the new revision number
+		new_items -- a dictionary of new payload items
+		keys_to_remove -- a list of the keys that shall be removed from the
+		 	payload
+		writer_name -- name of the Buffer that initiated this update, necessary
+			to enable remote components to filter out updates that originate d
+			from their own operations
+		'''
+		if new_items is None:
+			new_items = {}
+		if keys_to_remove is None:
+			keys_to_remove = []
+		payload_update = IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision)
+		payload_update.new_items = new_items
+		if is_delta:
+			payload_update.keys_to_remove = keys_to_remove
+		payload_update.writer_name = writer_name
+		informer = self._get_informer(iu._category)
+		informer.publishData(payload_update)
 */
+//}}}
+
+
+
+
+// IUInterface//{{{
+
+IUInterface::IUInterface()
+: _buffer(NULL), _committed(false)
+{
+}
+
+void IUInterface::_set_uid(const std::string& uid) {
+	if (_uid != "") {
+		throw IUAlreadyHasAnUIDError();
+	}
+	_uid = uid;
+}
+
+void IUInterface::_set_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) {
+	if (_buffer) {
+		throw IUAlreadyInABufferError();
+	}
+	_buffer = buffer;
+}
+
+void IUInterface::_set_owner_name(const std::string& owner_name) {
+	if (_owner_name != "") {
+		throw IUAlreadyHasAnOwnerNameError();
+	}
+	_owner_name = owner_name;
+}
 
-IU::ref IU::create(/* params */)
+void IUInterface::add_links(const std::string& type, const LinkSet& targets, const std::string& writer_name)
+{
+	LinkMap none;
+	LinkMap add;
+	add[type] = targets;
+	_modify_links(true, add, none, writer_name);
+	_add_and_remove_links(add, none);
+}
+
+void IUInterface::remove_links(const std::string& type, const LinkSet& targets, const std::string& writer_name)
+{
+	LinkMap none;
+	LinkMap remove;
+	remove[type] = targets;
+	_modify_links(true, none, remove, writer_name);
+	_add_and_remove_links(none, remove);
+}
+
+void IUInterface::modify_links(const LinkMap& add, const LinkMap& remove, const std::string& writer_name)
+{
+	_modify_links(true, add, remove, writer_name);
+	_add_and_remove_links(add, remove);
+}
+
+void IUInterface::set_links(const LinkMap& links, const std::string& writer_name)
+{
+	LinkMap none;
+	_modify_links(false, links, none, writer_name);
+	_replace_links(links);
+}
+
+//}}}
+
+// IU//{{{
+IU::ref IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
 {
 	IU::ref iu = IU::ref(new IU(/* params */));
-	iu->payload.initialize(iu);
+	iu->_payload.initialize(iu);
 	return iu;
 }
 
+IU::IU(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
+{
+	_revision = 1;
+	_uid = ipaaca::generate_uuid_string();
+	_category = category;
+	_payload_type = payload_type;
+	// payload initialization deferred to IU::create(), above
+	_read_only = read_only;
+	_access_mode = access_mode;
+	_committed = false;
+}
+
+void IU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
+{
+	_revision_lock.lock();
+	if (_committed) {
+		_revision_lock.unlock();
+		throw IUCommittedError();
+	}
+	_increase_revision_number();
+	if (is_published()) {
+		_buffer->_send_iu_link_update(this, is_delta, _revision, new_links, links_to_remove, writer_name);
+	}
+	_revision_lock.unlock();
+}
+void IU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
+{
+	_revision_lock.lock();
+	if (_committed) {
+		_revision_lock.unlock();
+		throw IUCommittedError();
+	}
+	_increase_revision_number();
+	if (is_published()) {
+		_buffer->_send_iu_payload_update(this, is_delta, _revision, new_items, keys_to_remove, writer_name);
+	}
+	_revision_lock.unlock();
+}
+
+void IU::commit()
+{
+	_internal_commit();
+}
+
+void IU::_internal_commit(const std::string& writer_name)
+{
+	_revision_lock.lock();
+	if (_committed) {
+		_revision_lock.unlock();
+		throw IUCommittedError();
+	}
+	_increase_revision_number();
+	_committed = true;
+	if (is_published()) {
+		_buffer->_send_iu_commission(this, _revision, writer_name);
+	}
+	_revision_lock.unlock();
+}
+//}}}
+
+// RemotePushIU//{{{
+void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
+{
+	IPAACA_IMPLEMENT_ME
+}
+void RemotePushIU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
+{
+	IPAACA_IMPLEMENT_ME
+}
+
+void RemotePushIU::commit()
+{
+	IPAACA_IMPLEMENT_ME
+}
+
+//}}}
+
+
+
+
 // PayloadEntryProxy//{{{
 
 PayloadEntryProxy::PayloadEntryProxy(Payload* payload, const std::string& key)
@@ -116,9 +383,6 @@ PayloadEntryProxy::operator double()
 
 // Payload//{{{
 
-Payload::Payload()
-{
-}
 void Payload::initialize(boost::shared_ptr<IUInterface> iu)
 {
 	_iu = iu;
@@ -131,11 +395,17 @@ PayloadEntryProxy Payload::operator[](const std::string& key)
 }
 
 inline void Payload::set(const std::string& k, const std::string& v) {
-	//self._iu._modify_payload(self, isdelta=true, newitm={k:v}, keystorm=[], writer_name=None );
+	std::map<std::string, std::string> _new;
+	std::vector<std::string> _remove;
+	_new[k]=v;
+	_iu->_modify_payload(true, _new, _remove, "" );
 	_store[k] = v;
 }
 inline void Payload::remove(const std::string& k) {
-	//self._iu._modify_payload(self, isdelta=true, newitm={}, keystorm=[k], writer_name=None );
+	std::map<std::string, std::string> _new;
+	std::vector<std::string> _remove;
+	_remove.push_back(k);
+	_iu->_modify_payload(true, _new, _remove, "" );
 	_store.erase(k);
 }
 inline std::string Payload::get(const std::string& k) {
@@ -145,7 +415,6 @@ inline std::string Payload::get(const std::string& k) {
 //}}}
 
 /*
-
 // IUConverter//{{{
 
 IUConverter::IUConverter()
@@ -199,10 +468,8 @@ AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std:
 }
 
 //}}}
-
 */
 
-
 // IUPayloadUpdateConverter//{{{
 
 IUPayloadUpdateConverter::IUPayloadUpdateConverter()
@@ -320,17 +587,6 @@ AnnotatedData IULinkUpdateConverter::deserialize(const std::string& wireSchema,
 //}}}
 
 
-
-
-
-
-
-
-
-
-
-
-
 } // of namespace ipaaca
 
 
diff --git a/cpp/src/ipaaca.h b/cpp/src/ipaaca.h
index b585605..ec55337 100644
--- a/cpp/src/ipaaca.h
+++ b/cpp/src/ipaaca.h
@@ -3,7 +3,7 @@
 
 #ifdef IPAACA_DEBUG_MESSAGES
 #define IPAACA_INFO(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- " << i << std::endl;
-#define IPAACA_IMPLEMENT_ME(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- IMPLEMENT ME" << std::endl;
+#define IPAACA_IMPLEMENT_ME std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- IMPLEMENT ME" << std::endl;
 #define IPAACA_TODO(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- TODO: " << i << std::endl;
 #else
 #define IPAACA_INFO(i) ;
@@ -11,6 +11,9 @@
 #define IPAACA_TODO(i) ;
 #endif
 
+// just for marking pure virtual functions for readability
+#define _IPAACA_ABSTRACT_
+
 #define IPAACA_PAYLOAD_DEFAULT_STRING_VALUE ""
 
 #include <iostream>
@@ -30,12 +33,17 @@
 
 #include <ipaaca.pb.h>
 
+#include <pthread.h>
+#include <uuid/uuid.h>
+
 //using namespace boost;
 using namespace rsb;
 using namespace rsb::converter;
 
 namespace ipaaca {
 
+typedef uint32_t revision_t;
+
 enum IUEventType {
 	ADDED,
 	COMMITTED,
@@ -51,23 +59,113 @@ enum IUAccessMode {
 	MESSAGE
 };
 
+//class {
+//public:
+//    template<typename T>
+//    operator shared_ptr<T>() { return shared_ptr<T>(); }
+//} NullPointer;
+
+class PayloadEntryProxy;
 class Payload;
 class IUInterface;
 class IU;
 class RemotePushIU;
-// class IULinkUpdate
-// class IULinkUpdateConverter;
+class IULinkUpdate;
+class IULinkUpdateConverter;
+class IUPayloadUpdate;
+class IUPayloadUpdateConverter;
 class IUStore;
 class FrozenIUStore;
 class IUEventHandler;
+class Buffer;
+class InputBuffer;
+class OutputBuffer;
+
+std::string generate_uuid_string();
 
-class Buffer {
+class Lock
+{
+	protected:
+		pthread_mutexattr_t _attrs;
+		pthread_mutex_t _mutex;
+	public:
+		inline Lock() {
+			pthread_mutexattr_init(&_attrs);
+			pthread_mutexattr_settype(&_attrs, PTHREAD_MUTEX_RECURSIVE);
+			pthread_mutex_init(&_mutex, &_attrs);
+		}
+		inline ~Lock() {
+			pthread_mutex_destroy(&_mutex);
+			pthread_mutexattr_destroy(&_attrs);
+		}
+		inline void lock() {
+			pthread_mutex_lock(&_mutex);
+		}
+		inline void unlock() {
+			pthread_mutex_unlock(&_mutex);
+		}
 };
 
-class InputBuffer: public Buffer {
+typedef std::set<std::string> LinkSet;
+typedef std::map<std::string, LinkSet> LinkMap;
+class SmartLinkMap {
+	public:
+		const LinkSet& get_links(const std::string& key);
+		const LinkMap& get_all_links();
+	
+	friend class IUInterface;
+	protected:
+		LinkMap _links;
+		void _add_and_remove_links(const LinkMap& add, const LinkMap& remove);
+		void _replace_links(const LinkMap& links);
 };
 
-class OutputBuffer: public Buffer {
+const LinkSet EMPTY_LINK_SET;
+//const std::set<std::string> EMPTY_LINK_SET;
+
+class Buffer { //: public boost::enable_shared_from_this<Buffer> {
+	friend class IU;
+	friend class RemotePushIU;
+	protected:
+		_IPAACA_ABSTRACT_ virtual void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef") = 0;
+		_IPAACA_ABSTRACT_ virtual void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef") = 0;
+		_IPAACA_ABSTRACT_ virtual void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name="undef") = 0;
+	public:
+		_IPAACA_ABSTRACT_ virtual void add(boost::shared_ptr<IU> iu) = 0;
+};
+
+class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<InputBuffer>  {
+	friend class IU;
+	friend class RemotePushIU;
+	protected:
+		inline void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef")
+		{
+			IPAACA_INFO("(ERROR) InputBuffer::_send_iu_link_update() should never be invoked")
+		}
+		inline void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef")
+		{
+			IPAACA_INFO("(ERROR) InputBuffer::_send_iu_payload_update() should never be invoked")
+		}
+		inline void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name="undef")
+		{
+			IPAACA_INFO("(ERROR) InputBuffer::_send_iu_commission() should never be invoked")
+		}
+	public:
+		inline void add(boost::shared_ptr<IU> iu)
+		{
+			IPAACA_IMPLEMENT_ME
+		}
+};
+
+class OutputBuffer: public Buffer { //, public boost::enable_shared_from_this<OutputBuffer>  {
+	friend class IU;
+	friend class RemotePushIU;
+	protected:
+		void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef");
+		void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef");
+		void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name);
+	public:
+		void add(boost::shared_ptr<IU> iu);
 };
 
 /*
@@ -80,44 +178,43 @@ class IUEventFunctionHandler: public rsb::EventFunctionHandler {
 };
 */
 
-class IUPayloadUpdate {
+class IUPayloadUpdate {//{{{
 	public:
 		std::string uid;
-		uint32_t revision;
+		revision_t revision;
 		std::string writer_name;
 		bool is_delta;
 		std::map<std::string, std::string> new_items;
 		std::vector<std::string> keys_to_remove;
 	friend std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj);
-};
-class IUPayloadUpdateConverter: public rsb::converter::Converter<std::string> {
+};//}}}
+class IUPayloadUpdateConverter: public rsb::converter::Converter<std::string> {//{{{
 	public:
 		IUPayloadUpdateConverter();
 		std::string serialize(const rsb::AnnotatedData& data, std::string& wire);
 		rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire);
-};
+};//}}}
 
-class IULinkUpdate {
+class IULinkUpdate {//{{{
 	public:
 		std::string uid;
-		uint32_t revision;
+		revision_t revision;
 		std::string writer_name;
 		bool is_delta;
 		std::map<std::string, std::set<std::string> > new_links;
 		std::map<std::string, std::set<std::string> > links_to_remove;
 	friend std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj);
-};
-class IULinkUpdateConverter: public rsb::converter::Converter<std::string> {
+};//}}}
+class IULinkUpdateConverter: public rsb::converter::Converter<std::string> {//{{{
 	public:
 		IULinkUpdateConverter();
 		std::string serialize(const rsb::AnnotatedData& data, std::string& wire);
 		rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire);
-};
-
+};//}}}
 
 void initialize_ipaaca_rsb();
 
-class PayloadEntryProxy
+class PayloadEntryProxy//{{{
 {
 	protected:
 		Payload* _payload;
@@ -131,9 +228,9 @@ class PayloadEntryProxy
 		inline std::string to_str() { return operator std::string(); }
 		inline long to_int() { return operator long(); }
 		inline double to_float() { return operator double(); }
-};
+};//}}}
 
-class Payload
+class Payload//{{{
 {
 	protected:
 		std::map<std::string, std::string> _store;
@@ -141,50 +238,168 @@ class Payload
 	protected:
 		friend class IU;
 		friend class RemotePushIU;
-		Payload();
 		void initialize(boost::shared_ptr<IUInterface> iu);
 	public:
 		PayloadEntryProxy operator[](const std::string& key);
 		void set(const std::string& k, const std::string& v);
 		void remove(const std::string& k);
 		std::string get(const std::string& k);
-};
+	typedef boost::shared_ptr<Payload> ref;
+};//}}}
 
-class IUInterface {
+class IUInterface {//{{{
+	protected:
+		IUInterface();
 	public:
 		inline virtual ~IUInterface() { }
-};
+	protected:
+		std::string _uid;
+		revision_t _revision;
+		std::string _category;
+		std::string _payload_type; // default is "MAP"
+		std::string _owner_name;
+		bool _committed;
+		IUAccessMode _access_mode;
+		bool _read_only;
+		//boost::shared_ptr<Buffer> _buffer;
+		Buffer* _buffer;
+		SmartLinkMap _links;
+	protected:
+		friend class Payload;
+		// Internal functions that perform the update logic,
+		//  e.g. sending a notification across the network
+		_IPAACA_ABSTRACT_ virtual void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) = 0;
+		_IPAACA_ABSTRACT_ virtual void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) = 0;
+		//void _set_buffer(boost::shared_ptr<Buffer> buffer);
+		void _set_buffer(Buffer* buffer);
+		void _set_uid(const std::string& uid);
+		void _set_owner_name(const std::string& owner_name);
+	protected:
+		// internal functions that do not emit update events
+		inline void _add_and_remove_links(const LinkMap& add, const LinkMap& remove) { _links._add_and_remove_links(add, remove); }
+		inline void _replace_links(const LinkMap& links) { _links._replace_links(links); }
+	public:
+		inline bool is_published() { return (_buffer != 0); }
+		inline const std::string& uid() { return _uid; }
+		inline revision_t revision() { return _revision; }
+		inline const std::string& category() { return _category; }
+		inline const std::string& payload_type() { return _payload_type; }
+		inline const std::string& owner_name() { return _owner_name; }
+		inline bool committed() { return _committed; }
+		inline IUAccessMode access_mode() { return _access_mode; }
+		inline bool read_only() { return _read_only; }
+		//inline boost::shared_ptr<Buffer> buffer() { return _buffer; }
+		inline Buffer* buffer() { return _buffer; }
+		inline const LinkSet& get_links(std::string type) { return _links.get_links(type); }
+		inline const LinkMap& get_all_links() { return _links.get_all_links(); }
+		// Payload
+		_IPAACA_ABSTRACT_ virtual Payload& payload() = 0;
+		// setters
+		_IPAACA_ABSTRACT_ virtual void commit() = 0;
+		// functions to modify and update links:
+		void add_links(const std::string& type, const LinkSet& targets, const std::string& writer_name = "");
+		void remove_links(const std::string& type, const LinkSet& targets, const std::string& writer_name = "");
+		void modify_links(const LinkMap& add, const LinkMap& remove, const std::string& writer_name = "");
+		void set_links(const LinkMap& links, const std::string& writer_name = "");
+		//    (with cpp specific convenience functions:)
+		void add_link(const std::string& type, const std::string& target, const std::string& writer_name = "");
+		void remove_link(const std::string& type, const std::string& target, const std::string& writer_name = "");
+};//}}}
 
-class IU: public IUInterface {
+class IU: public IUInterface {//{{{
+	friend class Buffer;
+	friend class InputBuffer;
+	friend class OutputBuffer;
+	public:
+		Payload _payload;
+	protected:
+		Lock _revision_lock;
+	protected:
+		inline void _increase_revision_number() { _revision++; }
+		IU(const std::string& category="undef", IUAccessMode access_mode=PUSH, bool read_only=false, const std::string& payload_type="MAP" );
 	public:
-		Payload payload;
+		inline ~IU() {
+			IPAACA_IMPLEMENT_ME
+		}
+		static boost::shared_ptr<IU> create(const std::string& category="undef", IUAccessMode access_mode=PUSH, bool read_only=false, const std::string& payload_type="MAP" );
+		inline Payload& payload() { return _payload; }
+		void commit();
+	protected:
+		void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = "");
+		void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name = "");
 	protected:
-		inline IU() { }
+		void _internal_commit(const std::string& writer_name = "");
 	public:
-		inline ~IU() { }
-		static boost::shared_ptr<IU> create();
 	typedef boost::shared_ptr<IU> ref;
-};
+};//}}}
 
-class RemotePushIU: public IUInterface {
+class RemotePushIU: public IUInterface {//{{{
+	friend class Buffer;
+	friend class InputBuffer;
+	friend class OutputBuffer;
+	protected:
+		//RemotePushIU();
 	public:
-		inline ~RemotePushIU() { }
-};
+		inline ~RemotePushIU() {
+			IPAACA_IMPLEMENT_ME
+		}
+		void commit();
+	protected:
+		void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = "");
+		void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name = "");
+};//}}}
 
-class IUPublishedError: public std::exception
+class Exception: public std::exception//{{{
 {
 	protected:
 		std::string _description;
+		inline Exception(const std::string& description=""): _description(description) { }
+	public:
+		inline ~Exception() throw() { }
+		const char* what() const throw() {
+			return _description.c_str();
+		}
+};//}}}
+class IUPublishedError: public Exception//{{{
+{
 	public:
 		inline ~IUPublishedError() throw() { }
 		inline IUPublishedError() { //boost::shared_ptr<IU> iu) {
 			_description = "IUPublishedError";
 		}
-		const char* what() const throw() {
-			return _description.c_str();
+};//}}}
+class IUCommittedError: public Exception//{{{
+{
+	public:
+		inline ~IUCommittedError() throw() { }
+		inline IUCommittedError() { //boost::shared_ptr<IU> iu) {
+			_description = "IUCommittedError";
 		}
-};
-
+};//}}}
+class IUAlreadyInABufferError: public Exception//{{{
+{
+	public:
+		inline ~IUAlreadyInABufferError() throw() { }
+		inline IUAlreadyInABufferError() { //boost::shared_ptr<IU> iu) {
+			_description = "IUAlreadyInABufferError";
+		}
+};//}}}
+class IUAlreadyHasAnUIDError: public Exception//{{{
+{
+	public:
+		inline ~IUAlreadyHasAnUIDError() throw() { }
+		inline IUAlreadyHasAnUIDError() { //boost::shared_ptr<IU> iu) {
+			_description = "IUAlreadyHasAnUIDError";
+		}
+};//}}}
+class IUAlreadyHasAnOwnerNameError: public Exception//{{{
+{
+	public:
+		inline ~IUAlreadyHasAnOwnerNameError() throw() { }
+		inline IUAlreadyHasAnOwnerNameError() { //boost::shared_ptr<IU> iu) {
+			_description = "IUAlreadyHasAnOwnerNameError";
+		}
+};//}}}
 
 } // of namespace ipaaca
 
-- 
GitLab