#include <ipaaca.h>
#include <cstdlib>

namespace ipaaca {

// util and init//{{{
std::string generate_uuid_string()
{
	uuid_t uuidt;
	uuid_string_t uuidstr;
	uuid_generate(uuidt);
	uuid_unparse_lower(uuidt, uuidstr);
	return uuidstr;
}

//const LinkSet EMPTY_LINK_SET = LinkSet();
//const std::set<std::string> EMPTY_LINK_SET();
       
void initialize_ipaaca_rsb()
{
	ParticipantConfig config = ParticipantConfig::fromConfiguration();
	Factory::getInstance().setDefaultParticipantConfig(config);
	
	boost::shared_ptr<IUPayloadUpdateConverter> payload_update_converter(new IUPayloadUpdateConverter());
	boost::shared_ptr<IULinkUpdateConverter> link_update_converter(new IULinkUpdateConverter());
	stringConverterRepository()->registerConverter(payload_update_converter);
	stringConverterRepository()->registerConverter(link_update_converter);
	
	//IPAACA_TODO("initialize all converters")
}
/*
void init_inprocess_too() {
	//ParticipantConfig config = Factory::getInstance().getDefaultParticipantConfig();
	ParticipantConfig config = ParticipantConfig::fromFile("rsb.cfg");
	//ParticipantConfig::Transport inprocess = config.getTransport("inprocess");
	//inprocess.setEnabled(true);
	//config.addTransport(inprocess);
	Factory::getInstance().setDefaultParticipantConfig(config);
}
*/
//}}}

std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj)//{{{
{
	os << "PayloadUpdate(uid=" << obj.uid << ", revision=" << obj.revision;
	os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False");
	os << ", new_items = {";
	bool first = true;
	for (std::map<std::string, std::string>::const_iterator it=obj.new_items.begin(); it!=obj.new_items.end(); ++it) {
		if (first) { first=false; } else { os << ", "; }
		os << "'" << it->first << "':'" << it->second << "'";
	}
	os << "}, keys_to_remove = [";
	first = true;
	for (std::vector<std::string>::const_iterator it=obj.keys_to_remove.begin(); it!=obj.keys_to_remove.end(); ++it) {
		if (first) { first=false; } else { os << ", "; }
		os << "'" << *it << "'";
	}
	os << "])";
	return os;
}
//}}}
std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj)//{{{
{
	os << "LinkUpdate(uid=" << obj.uid << ", revision=" << obj.revision;
	os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False");
	os << ", new_links = {";
	bool first = true;
	for (std::map<std::string, std::set<std::string> >::const_iterator it=obj.new_links.begin(); it!=obj.new_links.end(); ++it) {
		if (first) { first=false; } else { os << ", "; }
		os << "'" << it->first << "': [";
		bool ffirst = true;
		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
			if (ffirst) { ffirst=false; } else { os << ", "; }
			os << "'" << *it2 << "'";
		}
		os << "]";
	}
	os << "}, links_to_remove = {";
	first = true;
	for (std::map<std::string, std::set<std::string> >::const_iterator it=obj.links_to_remove.begin(); it!=obj.links_to_remove.end(); ++it) {
		if (first) { first=false; } else { os << ", "; }
		os << "'" << it->first << "': [";
		bool ffirst = true;
		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
			if (ffirst) { ffirst=false; } else { os << ", "; }
			os << "'" << *it2 << "'";
		}
		os << "]";
	}
	os << "})";
	return os;
}
//}}}

// SmartLinkMap//{{{
void SmartLinkMap::_add_and_remove_links(const LinkMap& add, const LinkMap& remove)
{
	// remove specified links
	for (LinkMap::const_iterator it = remove.begin(); it != remove.end(); ++it ) {
		// if link type exists
		if (_links.count(it->first) > 0) {
			// remove one by one
			for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
				_links[it->first].erase(*it2);
			}
			// wipe the type key if no more links are left
			if (_links[it->first].size() == 0) {
				_links.erase(it->first);
			}
		}
	}
	// add specified links
	for (LinkMap::const_iterator it = add.begin(); it != add.end(); ++it ) {
		for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
			_links[it->first].insert(*it2);
		}
	}
}
void SmartLinkMap::_replace_links(const LinkMap& links)
{
	//_links.clear();
	_links=links;
}
//}}}

// Buffer//{{{
void Buffer::_allocate_unique_name(const std::string& basename) {
	std::string uuid = ipaaca::generate_uuid_string();
	std::string name = basename + "-" + uuid.substr(0,8);
	_unique_name = name;
}
//}}}

// OutputBuffer//{{{

OutputBuffer::OutputBuffer(const std::string& basename)
:Buffer(basename)
{
}
void OutputBuffer::_send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
	IPAACA_IMPLEMENT_ME
}
void OutputBuffer::_send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
{
	IPAACA_IMPLEMENT_ME
}
void OutputBuffer::_send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name)
{
	IPAACA_IMPLEMENT_ME
}
void OutputBuffer::add(IU::ref iu)
{
	IPAACA_IMPLEMENT_ME
	// TODO place in iu store 
	iu->_associate_with_buffer(this); //shared_from_this());
	// TODO
}

/*
	def _send_iu_link_update(self, iu, is_delta, revision, new_links=None, links_to_remove=None, writer_name="undef"):
		'''Send an IU link update.
		
		Keyword arguments:
		iu -- the IU being updated
		is_delta -- whether this is an incremental update or a replacement
			the whole link dictionary
		revision -- the new revision number
		new_links -- a dictionary of new link sets
		links_to_remove -- a dict of the link sets that shall be removed
		writer_name -- name of the Buffer that initiated this update, necessary
			to enable remote components to filter out updates that originate d
			from their own operations
		'''
		if new_links is None:
			new_links = {}
		if links_to_remove is None:
			links_to_remove = {}
		link_update = IULinkUpdate(iu._uid, is_delta=is_delta, revision=revision)
		link_update.new_links = new_links
		if is_delta:
			link_update.links_to_remove = links_to_remove
		link_update.writer_name = writer_name
		informer = self._get_informer(iu._category)
		informer.publishData(link_update)
		# FIXME send the notification to the target, if the target is not the writer_name
*/
/*
	def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"):
		'''Send an IU payload update.
		
		Keyword arguments:
		iu -- the IU being updated
		is_delta -- whether this is an incremental update or a replacement
		revision -- the new revision number
		new_items -- a dictionary of new payload items
		keys_to_remove -- a list of the keys that shall be removed from the
		 	payload
		writer_name -- name of the Buffer that initiated this update, necessary
			to enable remote components to filter out updates that originate d
			from their own operations
		'''
		if new_items is None:
			new_items = {}
		if keys_to_remove is None:
			keys_to_remove = []
		payload_update = IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision)
		payload_update.new_items = new_items
		if is_delta:
			payload_update.keys_to_remove = keys_to_remove
		payload_update.writer_name = writer_name
		informer = self._get_informer(iu._category)
		informer.publishData(payload_update)
*/
//}}}

// InputBuffer//{{{
InputBuffer::InputBuffer(const std::string& basename)
:Buffer(basename)
{
}
//}}}



// IUInterface//{{{

IUInterface::IUInterface()
: _buffer(NULL), _committed(false)
{
}

void IUInterface::_set_uid(const std::string& uid) {
	if (_uid != "") {
		throw IUAlreadyHasAnUIDError();
	}
	_uid = uid;
}

void IUInterface::_set_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) {
	if (_buffer) {
		throw IUAlreadyInABufferError();
	}
	_buffer = buffer;
	
}

void IUInterface::_set_owner_name(const std::string& owner_name) {
	if (_owner_name != "") {
		throw IUAlreadyHasAnOwnerNameError();
	}
	_owner_name = owner_name;
}

/// set the buffer pointer and the owner names of IU and Payload
void IUInterface::_associate_with_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) {
	_set_buffer(buffer); // will throw if already set
	_set_owner_name(buffer->unique_name());
	payload()._set_owner_name(buffer->unique_name());
}

void IUInterface::add_links(const std::string& type, const LinkSet& targets, const std::string& writer_name)
{
	LinkMap none;
	LinkMap add;
	add[type] = targets;
	_modify_links(true, add, none, writer_name);
	_add_and_remove_links(add, none);
}

void IUInterface::remove_links(const std::string& type, const LinkSet& targets, const std::string& writer_name)
{
	LinkMap none;
	LinkMap remove;
	remove[type] = targets;
	_modify_links(true, none, remove, writer_name);
	_add_and_remove_links(none, remove);
}

void IUInterface::modify_links(const LinkMap& add, const LinkMap& remove, const std::string& writer_name)
{
	_modify_links(true, add, remove, writer_name);
	_add_and_remove_links(add, remove);
}

void IUInterface::set_links(const LinkMap& links, const std::string& writer_name)
{
	LinkMap none;
	_modify_links(false, links, none, writer_name);
	_replace_links(links);
}

//}}}

// IU//{{{
IU::ref IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
{
	IU::ref iu = IU::ref(new IU(category, access_mode, read_only, payload_type)); /* params */ //));
	iu->_payload.initialize(iu);
	return iu;
}

IU::IU(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
{
	_revision = 1;
	_uid = ipaaca::generate_uuid_string();
	_category = category;
	_payload_type = payload_type;
	// payload initialization deferred to IU::create(), above
	_read_only = read_only;
	_access_mode = access_mode;
	_committed = false;
}

void IU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
	_revision_lock.lock();
	if (_committed) {
		_revision_lock.unlock();
		throw IUCommittedError();
	}
	_increase_revision_number();
	if (is_published()) {
		_buffer->_send_iu_link_update(this, is_delta, _revision, new_links, links_to_remove, writer_name);
	}
	_revision_lock.unlock();
}
void IU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
{
	_revision_lock.lock();
	if (_committed) {
		_revision_lock.unlock();
		throw IUCommittedError();
	}
	_increase_revision_number();
	if (is_published()) {
		_buffer->_send_iu_payload_update(this, is_delta, _revision, new_items, keys_to_remove, writer_name);
	}
	_revision_lock.unlock();
}

void IU::commit()
{
	_internal_commit();
}

void IU::_internal_commit(const std::string& writer_name)
{
	_revision_lock.lock();
	if (_committed) {
		_revision_lock.unlock();
		throw IUCommittedError();
	}
	_increase_revision_number();
	_committed = true;
	if (is_published()) {
		_buffer->_send_iu_commission(this, _revision, writer_name);
	}
	_revision_lock.unlock();
}
//}}}

// RemotePushIU//{{{

RemotePushIU::ref RemotePushIU::create()
{
	RemotePushIU::ref iu = RemotePushIU::ref(new RemotePushIU(/* params */));
	iu->_payload.initialize(iu);
	return iu;
}
RemotePushIU::RemotePushIU()
{
	// nothing
}
void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
	IPAACA_IMPLEMENT_ME
}
void RemotePushIU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
{
	IPAACA_IMPLEMENT_ME
}

void RemotePushIU::commit()
{
	IPAACA_IMPLEMENT_ME
}

//}}}




// PayloadEntryProxy//{{{

PayloadEntryProxy::PayloadEntryProxy(Payload* payload, const std::string& key)
: _payload(payload), _key(key)
{
}
PayloadEntryProxy& PayloadEntryProxy::operator=(const std::string& value)
{
	_payload->set(_key, value);
	return *this;
}
PayloadEntryProxy::operator std::string()
{
	return _payload->get(_key);
}
PayloadEntryProxy::operator long()
{
	return atol(operator std::string().c_str());
}
PayloadEntryProxy::operator double()
{
	return atof(operator std::string().c_str());
}
//}}}

// Payload//{{{

void Payload::initialize(boost::shared_ptr<IUInterface> iu)
{
	_iu = iu;
}

PayloadEntryProxy Payload::operator[](const std::string& key)
{
	//boost::shared_ptr<PayloadEntryProxy> p(new PayloadEntryProxy(this, key));
	return PayloadEntryProxy(this, key);
}

inline void Payload::set(const std::string& k, const std::string& v) {
	std::map<std::string, std::string> _new;
	std::vector<std::string> _remove;
	_new[k]=v;
	_iu->_modify_payload(true, _new, _remove, "" );
	_store[k] = v;
}
inline void Payload::remove(const std::string& k) {
	std::map<std::string, std::string> _new;
	std::vector<std::string> _remove;
	_remove.push_back(k);
	_iu->_modify_payload(true, _new, _remove, "" );
	_store.erase(k);
}
inline std::string Payload::get(const std::string& k) {
	if (_store.count(k)>0) return _store[k];
	else return IPAACA_PAYLOAD_DEFAULT_STRING_VALUE;
}
//}}}

// IUConverter//{{{

IUConverter::IUConverter()
: Converter<std::string> ("ipaaca::IU", "ipaaca-iu", true)
{
}

std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire)
{
	// Ensure that DATA actually holds a datum of the data-type we expect.
	assert(data.first == getDataType()); // "ipaaca::IU"
	// NOTE: a dynamic_pointer_cast cannot be used from void*
	boost::shared_ptr<const IU> obj = boost::static_pointer_cast<const IU> (data.second);
	boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
	// transfer obj data to pbo
	pbo->set_uid(obj->uid());
	pbo->set_revision(obj->revision());
	pbo->set_category(obj->category());
	pbo->set_payload_type(obj->payload_type());
	pbo->set_owner_name(obj->owner_name());
	pbo->set_committed(obj->committed());
	pbo->set_access_mode(ipaaca::protobuf::IU::PUSH); // TODO
	pbo->set_read_only(obj->read_only());
	for (std::map<std::string, std::string>::const_iterator it=obj->_payload._store.begin(); it!=obj->_payload._store.end(); ++it) {
		protobuf::PayloadItem* item = pbo->add_payload();
		item->set_key(it->first);
		item->set_value(it->second);
		item->set_type("str"); // FIXME other types than str (later)
	}
	for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) {
		protobuf::LinkSet* links = pbo->add_links();
		links->set_type(it->first);
		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
			links->add_targets(*it2);
		}
	}
	pbo->SerializeToString(&wire);
	return getWireSchema();

}

AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
	assert(wireSchema == getWireSchema()); // "ipaaca-iu"
	boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
	pbo->ParseFromString(wire);
	IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode());
	switch(mode) {
		case IU_ACCESS_PUSH:
			{
			// Create a "remote push IU"
			boost::shared_ptr<RemotePushIU> obj(new RemotePushIU());
			// transfer pbo data to obj
			obj->_uid = pbo->uid();
			obj->_revision = pbo->revision();
			obj->_category = pbo->category();
			obj->_payload_type = pbo->payload_type();
			obj->_owner_name = pbo->owner_name();
			obj->_committed = pbo->committed();
			obj->_read_only = pbo->read_only();
			obj->_access_mode = IU_ACCESS_PUSH;
			for (int i=0; i<pbo->payload_size(); i++) {
				const protobuf::PayloadItem& it = pbo->payload(i);
				obj->_payload._store[it.key()] = it.value();
			}
			for (int i=0; i<pbo->links_size(); i++) {
				const protobuf::LinkSet& pls = pbo->links(i);
				LinkSet& ls = obj->_links._links[pls.type()];
				for (int j=0; j<pls.targets_size(); j++) {
					ls.insert(pls.targets(j));
				}
			}
			return std::make_pair(getDataType(), obj);
			break;
			}
		default:
			// other cases not handled yet! ( TODO )
			throw NotImplementedError();
	}
}

//}}}

// IUPayloadUpdateConverter//{{{

IUPayloadUpdateConverter::IUPayloadUpdateConverter()
: Converter<std::string> ("ipaaca::IUPayloadUpdate", "ipaaca-iu-payload-update", true)
{
}

std::string IUPayloadUpdateConverter::serialize(const AnnotatedData& data, std::string& wire)
{
	assert(data.first == getDataType()); // "ipaaca::IUPayloadUpdate"
	boost::shared_ptr<const IUPayloadUpdate> obj = boost::static_pointer_cast<const IUPayloadUpdate> (data.second);
	boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
	// transfer obj data to pbo
	pbo->set_uid(obj->uid);
	pbo->set_revision(obj->revision);
	pbo->set_writer_name(obj->writer_name);
	pbo->set_is_delta(obj->is_delta);
	for (std::map<std::string, std::string>::const_iterator it=obj->new_items.begin(); it!=obj->new_items.end(); ++it) {
		protobuf::PayloadItem* item = pbo->add_new_items();
		item->set_key(it->first);
		item->set_value(it->second);
		item->set_type("str"); // FIXME other types than str (later)
	}
	for (std::vector<std::string>::const_iterator it=obj->keys_to_remove.begin(); it!=obj->keys_to_remove.end(); ++it) {
		pbo->add_keys_to_remove(*it);
	}
	pbo->SerializeToString(&wire);
	return getWireSchema();

}

AnnotatedData IUPayloadUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
	assert(wireSchema == getWireSchema()); // "ipaaca-iu-payload-update"
	boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
	pbo->ParseFromString(wire);
	boost::shared_ptr<IUPayloadUpdate> obj(new IUPayloadUpdate());
	// transfer pbo data to obj
	obj->uid = pbo->uid();
	obj->revision = pbo->revision();
	obj->writer_name = pbo->writer_name();
	obj->is_delta = pbo->is_delta();
	for (int i=0; i<pbo->new_items_size(); i++) {
		const protobuf::PayloadItem& it = pbo->new_items(i);
		obj->new_items[it.key()] = it.value();
	}
	for (int i=0; i<pbo->keys_to_remove_size(); i++) {
		obj->keys_to_remove.push_back(pbo->keys_to_remove(i));
	}
	return std::make_pair(getDataType(), obj);
}

//}}}

// IULinkUpdateConverter//{{{

IULinkUpdateConverter::IULinkUpdateConverter()
: Converter<std::string> ("ipaaca::IULinkUpdate", "ipaaca-iu-link-update", true)
{
}

std::string IULinkUpdateConverter::serialize(const AnnotatedData& data, std::string& wire)
{
	assert(data.first == getDataType()); // "ipaaca::IULinkUpdate"
	boost::shared_ptr<const IULinkUpdate> obj = boost::static_pointer_cast<const IULinkUpdate> (data.second);
	boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
	// transfer obj data to pbo
	pbo->set_uid(obj->uid);
	pbo->set_revision(obj->revision);
	pbo->set_writer_name(obj->writer_name);
	pbo->set_is_delta(obj->is_delta);
	for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->new_links.begin(); it!=obj->new_links.end(); ++it) {
		protobuf::LinkSet* links = pbo->add_new_links();
		links->set_type(it->first);
		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
			links->add_targets(*it2);
		}
	}
	for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->links_to_remove.begin(); it!=obj->links_to_remove.end(); ++it) {
		protobuf::LinkSet* links = pbo->add_links_to_remove();
		links->set_type(it->first);
		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
			links->add_targets(*it2);
		}
	}
	pbo->SerializeToString(&wire);
	return getWireSchema();

}

AnnotatedData IULinkUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
	assert(wireSchema == getWireSchema()); // "ipaaca-iu-link-update"
	boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
	pbo->ParseFromString(wire);
	boost::shared_ptr<IULinkUpdate> obj(new IULinkUpdate());
	// transfer pbo data to obj
	obj->uid = pbo->uid();
	obj->revision = pbo->revision();
	obj->writer_name = pbo->writer_name();
	obj->is_delta = pbo->is_delta();
	for (int i=0; i<pbo->new_links_size(); ++i) {
		const protobuf::LinkSet& it = pbo->new_links(i);
		for (int j=0; j<it.targets_size(); ++j) {
			obj->new_links[it.type()].insert(it.targets(j)); // = vec;
		}
	}
	for (int i=0; i<pbo->links_to_remove_size(); ++i) {
		const protobuf::LinkSet& it = pbo->links_to_remove(i);
		for (int j=0; j<it.targets_size(); ++j) {
			obj->links_to_remove[it.type()].insert(it.targets(j));
		}
	}
	return std::make_pair(getDataType(), obj);
}

//}}}


} // of namespace ipaaca