Skip to content
Snippets Groups Projects
ipaaca.cc 20.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • #include <ipaaca.h>
    #include <cstdlib>
    
    namespace ipaaca {
    
    
    // util and init//{{{
    std::string generate_uuid_string()
    {
    	uuid_t uuidt;
    	uuid_string_t uuidstr;
    	uuid_generate(uuidt);
    	uuid_unparse_lower(uuidt, uuidstr);
    	return uuidstr;
    }
    
    //const LinkSet EMPTY_LINK_SET = LinkSet();
    //const std::set<std::string> EMPTY_LINK_SET();
           
    
    void initialize_ipaaca_rsb()
    {
    	ParticipantConfig config = ParticipantConfig::fromConfiguration();
    	Factory::getInstance().setDefaultParticipantConfig(config);
    	
    	boost::shared_ptr<IUPayloadUpdateConverter> payload_update_converter(new IUPayloadUpdateConverter());
    	boost::shared_ptr<IULinkUpdateConverter> link_update_converter(new IULinkUpdateConverter());
    	stringConverterRepository()->registerConverter(payload_update_converter);
    	stringConverterRepository()->registerConverter(link_update_converter);
    	
    	//IPAACA_TODO("initialize all converters")
    }
    
    /*
    void init_inprocess_too() {
    	//ParticipantConfig config = Factory::getInstance().getDefaultParticipantConfig();
    	ParticipantConfig config = ParticipantConfig::fromFile("rsb.cfg");
    	//ParticipantConfig::Transport inprocess = config.getTransport("inprocess");
    	//inprocess.setEnabled(true);
    	//config.addTransport(inprocess);
    	Factory::getInstance().setDefaultParticipantConfig(config);
    }
    */
    //}}}
    
    std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj)//{{{
    
    {
    	os << "PayloadUpdate(uid=" << obj.uid << ", revision=" << obj.revision;
    	os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False");
    	os << ", new_items = {";
    	bool first = true;
    	for (std::map<std::string, std::string>::const_iterator it=obj.new_items.begin(); it!=obj.new_items.end(); ++it) {
    		if (first) { first=false; } else { os << ", "; }
    		os << "'" << it->first << "':'" << it->second << "'";
    	}
    	os << "}, keys_to_remove = [";
    	first = true;
    	for (std::vector<std::string>::const_iterator it=obj.keys_to_remove.begin(); it!=obj.keys_to_remove.end(); ++it) {
    		if (first) { first=false; } else { os << ", "; }
    		os << "'" << *it << "'";
    	}
    	os << "])";
    	return os;
    }
    
    //}}}
    std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj)//{{{
    
    {
    	os << "LinkUpdate(uid=" << obj.uid << ", revision=" << obj.revision;
    	os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False");
    	os << ", new_links = {";
    	bool first = true;
    	for (std::map<std::string, std::set<std::string> >::const_iterator it=obj.new_links.begin(); it!=obj.new_links.end(); ++it) {
    		if (first) { first=false; } else { os << ", "; }
    		os << "'" << it->first << "': [";
    		bool ffirst = true;
    		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
    			if (ffirst) { ffirst=false; } else { os << ", "; }
    			os << "'" << *it2 << "'";
    		}
    		os << "]";
    	}
    	os << "}, links_to_remove = {";
    	first = true;
    	for (std::map<std::string, std::set<std::string> >::const_iterator it=obj.links_to_remove.begin(); it!=obj.links_to_remove.end(); ++it) {
    		if (first) { first=false; } else { os << ", "; }
    		os << "'" << it->first << "': [";
    		bool ffirst = true;
    		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
    			if (ffirst) { ffirst=false; } else { os << ", "; }
    			os << "'" << *it2 << "'";
    		}
    		os << "]";
    	}
    	os << "})";
    	return os;
    }
    
    //}}}
    
    // SmartLinkMap//{{{
    void SmartLinkMap::_add_and_remove_links(const LinkMap& add, const LinkMap& remove)
    {
    	// remove specified links
    	for (LinkMap::const_iterator it = remove.begin(); it != remove.end(); ++it ) {
    		// if link type exists
    		if (_links.count(it->first) > 0) {
    			// remove one by one
    			for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
    				_links[it->first].erase(*it2);
    			}
    			// wipe the type key if no more links are left
    			if (_links[it->first].size() == 0) {
    				_links.erase(it->first);
    			}
    		}
    	}
    	// add specified links
    	for (LinkMap::const_iterator it = add.begin(); it != add.end(); ++it ) {
    		for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
    			_links[it->first].insert(*it2);
    		}
    	}
    }
    void SmartLinkMap::_replace_links(const LinkMap& links)
    {
    	//_links.clear();
    	_links=links;
    }
    //}}}
    
    // Buffer//{{{
    void Buffer::_allocate_unique_name(const std::string& basename) {
    	std::string uuid = ipaaca::generate_uuid_string();
    	std::string name = basename + "-" + uuid.substr(0,8);
    	_unique_name = name;
    }
    //}}}
    
    
    // OutputBuffer//{{{
    
    
    OutputBuffer::OutputBuffer(const std::string& basename)
    :Buffer(basename)
    {
    }
    
    void OutputBuffer::_send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
    {
    	IPAACA_IMPLEMENT_ME
    }
    void OutputBuffer::_send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
    {
    	IPAACA_IMPLEMENT_ME
    
    void OutputBuffer::_send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name)
    {
    	IPAACA_IMPLEMENT_ME
    }
    void OutputBuffer::add(IU::ref iu)
    {
    	IPAACA_IMPLEMENT_ME
    	// TODO place in iu store 
    
    	iu->_associate_with_buffer(this); //shared_from_this());
    
    	// TODO
    }
    
    /*
    	def _send_iu_link_update(self, iu, is_delta, revision, new_links=None, links_to_remove=None, writer_name="undef"):
    		'''Send an IU link update.
    		
    		Keyword arguments:
    		iu -- the IU being updated
    		is_delta -- whether this is an incremental update or a replacement
    			the whole link dictionary
    		revision -- the new revision number
    		new_links -- a dictionary of new link sets
    		links_to_remove -- a dict of the link sets that shall be removed
    		writer_name -- name of the Buffer that initiated this update, necessary
    			to enable remote components to filter out updates that originate d
    			from their own operations
    		'''
    		if new_links is None:
    			new_links = {}
    		if links_to_remove is None:
    			links_to_remove = {}
    		link_update = IULinkUpdate(iu._uid, is_delta=is_delta, revision=revision)
    		link_update.new_links = new_links
    		if is_delta:
    			link_update.links_to_remove = links_to_remove
    		link_update.writer_name = writer_name
    		informer = self._get_informer(iu._category)
    		informer.publishData(link_update)
    		# FIXME send the notification to the target, if the target is not the writer_name
    */
    /*
    	def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"):
    		'''Send an IU payload update.
    		
    		Keyword arguments:
    		iu -- the IU being updated
    		is_delta -- whether this is an incremental update or a replacement
    		revision -- the new revision number
    		new_items -- a dictionary of new payload items
    		keys_to_remove -- a list of the keys that shall be removed from the
    		 	payload
    		writer_name -- name of the Buffer that initiated this update, necessary
    			to enable remote components to filter out updates that originate d
    			from their own operations
    		'''
    		if new_items is None:
    			new_items = {}
    		if keys_to_remove is None:
    			keys_to_remove = []
    		payload_update = IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision)
    		payload_update.new_items = new_items
    		if is_delta:
    			payload_update.keys_to_remove = keys_to_remove
    		payload_update.writer_name = writer_name
    		informer = self._get_informer(iu._category)
    		informer.publishData(payload_update)
    
    // InputBuffer//{{{
    InputBuffer::InputBuffer(const std::string& basename)
    :Buffer(basename)
    {
    }
    //}}}
    
    
    
    
    // IUInterface//{{{
    
    IUInterface::IUInterface()
    : _buffer(NULL), _committed(false)
    {
    }
    
    void IUInterface::_set_uid(const std::string& uid) {
    	if (_uid != "") {
    		throw IUAlreadyHasAnUIDError();
    	}
    	_uid = uid;
    }
    
    void IUInterface::_set_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) {
    	if (_buffer) {
    		throw IUAlreadyInABufferError();
    	}
    	_buffer = buffer;
    
    }
    
    void IUInterface::_set_owner_name(const std::string& owner_name) {
    	if (_owner_name != "") {
    		throw IUAlreadyHasAnOwnerNameError();
    	}
    	_owner_name = owner_name;
    }
    
    /// set the buffer pointer and the owner names of IU and Payload
    void IUInterface::_associate_with_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) {
    	_set_buffer(buffer); // will throw if already set
    	_set_owner_name(buffer->unique_name());
    	payload()._set_owner_name(buffer->unique_name());
    }
    
    
    void IUInterface::add_links(const std::string& type, const LinkSet& targets, const std::string& writer_name)
    {
    	LinkMap none;
    	LinkMap add;
    	add[type] = targets;
    	_modify_links(true, add, none, writer_name);
    	_add_and_remove_links(add, none);
    }
    
    void IUInterface::remove_links(const std::string& type, const LinkSet& targets, const std::string& writer_name)
    {
    	LinkMap none;
    	LinkMap remove;
    	remove[type] = targets;
    	_modify_links(true, none, remove, writer_name);
    	_add_and_remove_links(none, remove);
    }
    
    void IUInterface::modify_links(const LinkMap& add, const LinkMap& remove, const std::string& writer_name)
    {
    	_modify_links(true, add, remove, writer_name);
    	_add_and_remove_links(add, remove);
    }
    
    void IUInterface::set_links(const LinkMap& links, const std::string& writer_name)
    {
    	LinkMap none;
    	_modify_links(false, links, none, writer_name);
    	_replace_links(links);
    }
    
    //}}}
    
    // IU//{{{
    IU::ref IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
    
    	IU::ref iu = IU::ref(new IU(category, access_mode, read_only, payload_type)); /* params */ //));
    
    	iu->_payload.initialize(iu);
    
    IU::IU(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
    {
    	_revision = 1;
    	_uid = ipaaca::generate_uuid_string();
    	_category = category;
    	_payload_type = payload_type;
    	// payload initialization deferred to IU::create(), above
    	_read_only = read_only;
    	_access_mode = access_mode;
    	_committed = false;
    }
    
    void IU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
    {
    	_revision_lock.lock();
    	if (_committed) {
    		_revision_lock.unlock();
    		throw IUCommittedError();
    	}
    	_increase_revision_number();
    	if (is_published()) {
    		_buffer->_send_iu_link_update(this, is_delta, _revision, new_links, links_to_remove, writer_name);
    	}
    	_revision_lock.unlock();
    }
    void IU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
    {
    	_revision_lock.lock();
    	if (_committed) {
    		_revision_lock.unlock();
    		throw IUCommittedError();
    	}
    	_increase_revision_number();
    	if (is_published()) {
    		_buffer->_send_iu_payload_update(this, is_delta, _revision, new_items, keys_to_remove, writer_name);
    	}
    	_revision_lock.unlock();
    }
    
    void IU::commit()
    {
    	_internal_commit();
    }
    
    void IU::_internal_commit(const std::string& writer_name)
    {
    	_revision_lock.lock();
    	if (_committed) {
    		_revision_lock.unlock();
    		throw IUCommittedError();
    	}
    	_increase_revision_number();
    	_committed = true;
    	if (is_published()) {
    		_buffer->_send_iu_commission(this, _revision, writer_name);
    	}
    	_revision_lock.unlock();
    }
    //}}}
    
    // RemotePushIU//{{{
    
    
    RemotePushIU::ref RemotePushIU::create()
    {
    	RemotePushIU::ref iu = RemotePushIU::ref(new RemotePushIU(/* params */));
    	iu->_payload.initialize(iu);
    	return iu;
    }
    RemotePushIU::RemotePushIU()
    {
    	// nothing
    }
    
    void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
    {
    	IPAACA_IMPLEMENT_ME
    }
    void RemotePushIU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
    {
    	IPAACA_IMPLEMENT_ME
    }
    
    void RemotePushIU::commit()
    {
    	IPAACA_IMPLEMENT_ME
    }
    
    //}}}
    
    
    
    
    
    // PayloadEntryProxy//{{{
    
    PayloadEntryProxy::PayloadEntryProxy(Payload* payload, const std::string& key)
    : _payload(payload), _key(key)
    {
    }
    PayloadEntryProxy& PayloadEntryProxy::operator=(const std::string& value)
    {
    	_payload->set(_key, value);
    	return *this;
    }
    PayloadEntryProxy::operator std::string()
    {
    	return _payload->get(_key);
    }
    PayloadEntryProxy::operator long()
    {
    	return atol(operator std::string().c_str());
    }
    PayloadEntryProxy::operator double()
    {
    	return atof(operator std::string().c_str());
    }
    //}}}
    
    // Payload//{{{
    
    void Payload::initialize(boost::shared_ptr<IUInterface> iu)
    {
    	_iu = iu;
    }
    
    PayloadEntryProxy Payload::operator[](const std::string& key)
    {
    	//boost::shared_ptr<PayloadEntryProxy> p(new PayloadEntryProxy(this, key));
    	return PayloadEntryProxy(this, key);
    }
    
    inline void Payload::set(const std::string& k, const std::string& v) {
    
    	std::map<std::string, std::string> _new;
    	std::vector<std::string> _remove;
    	_new[k]=v;
    	_iu->_modify_payload(true, _new, _remove, "" );
    
    	_store[k] = v;
    }
    inline void Payload::remove(const std::string& k) {
    
    	std::map<std::string, std::string> _new;
    	std::vector<std::string> _remove;
    	_remove.push_back(k);
    	_iu->_modify_payload(true, _new, _remove, "" );
    
    	_store.erase(k);
    }
    inline std::string Payload::get(const std::string& k) {
    	if (_store.count(k)>0) return _store[k];
    	else return IPAACA_PAYLOAD_DEFAULT_STRING_VALUE;
    }
    //}}}
    
    // IUConverter//{{{
    
    IUConverter::IUConverter()
    : Converter<std::string> ("ipaaca::IU", "ipaaca-iu", true)
    {
    }
    
    std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire)
    {
    	// Ensure that DATA actually holds a datum of the data-type we expect.
    	assert(data.first == getDataType()); // "ipaaca::IU"
    	// NOTE: a dynamic_pointer_cast cannot be used from void*
    	boost::shared_ptr<const IU> obj = boost::static_pointer_cast<const IU> (data.second);
    	boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
    	// transfer obj data to pbo
    
    	pbo->set_uid(obj->uid());
    	pbo->set_revision(obj->revision());
    	pbo->set_category(obj->category());
    	pbo->set_payload_type(obj->payload_type());
    	pbo->set_owner_name(obj->owner_name());
    	pbo->set_committed(obj->committed());
    	pbo->set_access_mode(ipaaca::protobuf::IU::PUSH); // TODO
    	pbo->set_read_only(obj->read_only());
    	for (std::map<std::string, std::string>::const_iterator it=obj->_payload._store.begin(); it!=obj->_payload._store.end(); ++it) {
    		protobuf::PayloadItem* item = pbo->add_payload();
    
    		item->set_key(it->first);
    		item->set_value(it->second);
    
    		item->set_type("str"); // FIXME other types than str (later)
    
    	for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) {
    		protobuf::LinkSet* links = pbo->add_links();
    		links->set_type(it->first);
    		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
    			links->add_targets(*it2);
    		}
    
    	}
    	pbo->SerializeToString(&wire);
    	return getWireSchema();
    
    }
    
    AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
    
    	assert(wireSchema == getWireSchema()); // "ipaaca-iu"
    
    	boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
    	pbo->ParseFromString(wire);
    
    	IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode());
    	switch(mode) {
    		case IU_ACCESS_PUSH:
    			{
    			// Create a "remote push IU"
    			boost::shared_ptr<RemotePushIU> obj(new RemotePushIU());
    			// transfer pbo data to obj
    			obj->_uid = pbo->uid();
    			obj->_revision = pbo->revision();
    			obj->_category = pbo->category();
    			obj->_payload_type = pbo->payload_type();
    			obj->_owner_name = pbo->owner_name();
    			obj->_committed = pbo->committed();
    			obj->_read_only = pbo->read_only();
    			obj->_access_mode = IU_ACCESS_PUSH;
    			for (int i=0; i<pbo->payload_size(); i++) {
    				const protobuf::PayloadItem& it = pbo->payload(i);
    				obj->_payload._store[it.key()] = it.value();
    			}
    			for (int i=0; i<pbo->links_size(); i++) {
    				const protobuf::LinkSet& pls = pbo->links(i);
    				LinkSet& ls = obj->_links._links[pls.type()];
    				for (int j=0; j<pls.targets_size(); j++) {
    					ls.insert(pls.targets(j));
    				}
    			}
    			return std::make_pair(getDataType(), obj);
    			break;
    			}
    		default:
    			// other cases not handled yet! ( TODO )
    			throw NotImplementedError();
    
    	}
    }
    
    //}}}
    
    // IUPayloadUpdateConverter//{{{
    
    IUPayloadUpdateConverter::IUPayloadUpdateConverter()
    : Converter<std::string> ("ipaaca::IUPayloadUpdate", "ipaaca-iu-payload-update", true)
    {
    }
    
    std::string IUPayloadUpdateConverter::serialize(const AnnotatedData& data, std::string& wire)
    {
    	assert(data.first == getDataType()); // "ipaaca::IUPayloadUpdate"
    	boost::shared_ptr<const IUPayloadUpdate> obj = boost::static_pointer_cast<const IUPayloadUpdate> (data.second);
    	boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
    	// transfer obj data to pbo
    	pbo->set_uid(obj->uid);
    	pbo->set_revision(obj->revision);
    	pbo->set_writer_name(obj->writer_name);
    	pbo->set_is_delta(obj->is_delta);
    	for (std::map<std::string, std::string>::const_iterator it=obj->new_items.begin(); it!=obj->new_items.end(); ++it) {
    		protobuf::PayloadItem* item = pbo->add_new_items();
    		item->set_key(it->first);
    		item->set_value(it->second);
    		item->set_type("str"); // FIXME other types than str (later)
    	}
    	for (std::vector<std::string>::const_iterator it=obj->keys_to_remove.begin(); it!=obj->keys_to_remove.end(); ++it) {
    		pbo->add_keys_to_remove(*it);
    	}
    	pbo->SerializeToString(&wire);
    	return getWireSchema();
    
    }
    
    AnnotatedData IUPayloadUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
    	assert(wireSchema == getWireSchema()); // "ipaaca-iu-payload-update"
    	boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
    	pbo->ParseFromString(wire);
    	boost::shared_ptr<IUPayloadUpdate> obj(new IUPayloadUpdate());
    	// transfer pbo data to obj
    	obj->uid = pbo->uid();
    	obj->revision = pbo->revision();
    	obj->writer_name = pbo->writer_name();
    	obj->is_delta = pbo->is_delta();
    	for (int i=0; i<pbo->new_items_size(); i++) {
    		const protobuf::PayloadItem& it = pbo->new_items(i);
    		obj->new_items[it.key()] = it.value();
    	}
    	for (int i=0; i<pbo->keys_to_remove_size(); i++) {
    		obj->keys_to_remove.push_back(pbo->keys_to_remove(i));
    	}
    	return std::make_pair(getDataType(), obj);
    }
    
    //}}}
    
    // IULinkUpdateConverter//{{{
    
    IULinkUpdateConverter::IULinkUpdateConverter()
    : Converter<std::string> ("ipaaca::IULinkUpdate", "ipaaca-iu-link-update", true)
    {
    }
    
    std::string IULinkUpdateConverter::serialize(const AnnotatedData& data, std::string& wire)
    {
    	assert(data.first == getDataType()); // "ipaaca::IULinkUpdate"
    	boost::shared_ptr<const IULinkUpdate> obj = boost::static_pointer_cast<const IULinkUpdate> (data.second);
    	boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
    	// transfer obj data to pbo
    	pbo->set_uid(obj->uid);
    	pbo->set_revision(obj->revision);
    	pbo->set_writer_name(obj->writer_name);
    	pbo->set_is_delta(obj->is_delta);
    	for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->new_links.begin(); it!=obj->new_links.end(); ++it) {
    		protobuf::LinkSet* links = pbo->add_new_links();
    		links->set_type(it->first);
    		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
    			links->add_targets(*it2);
    		}
    	}
    	for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->links_to_remove.begin(); it!=obj->links_to_remove.end(); ++it) {
    		protobuf::LinkSet* links = pbo->add_links_to_remove();
    		links->set_type(it->first);
    		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
    			links->add_targets(*it2);
    		}
    	}
    	pbo->SerializeToString(&wire);
    	return getWireSchema();
    
    }
    
    AnnotatedData IULinkUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
    	assert(wireSchema == getWireSchema()); // "ipaaca-iu-link-update"
    	boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
    	pbo->ParseFromString(wire);
    	boost::shared_ptr<IULinkUpdate> obj(new IULinkUpdate());
    	// transfer pbo data to obj
    	obj->uid = pbo->uid();
    	obj->revision = pbo->revision();
    	obj->writer_name = pbo->writer_name();
    	obj->is_delta = pbo->is_delta();
    	for (int i=0; i<pbo->new_links_size(); ++i) {
    		const protobuf::LinkSet& it = pbo->new_links(i);
    		for (int j=0; j<it.targets_size(); ++j) {
    			obj->new_links[it.type()].insert(it.targets(j)); // = vec;
    		}
    	}
    	for (int i=0; i<pbo->links_to_remove_size(); ++i) {
    		const protobuf::LinkSet& it = pbo->links_to_remove(i);
    		for (int j=0; j<it.targets_size(); ++j) {
    			obj->links_to_remove[it.type()].insert(it.targets(j));
    		}
    	}
    	return std::make_pair(getDataType(), obj);
    }
    
    //}}}
    
    
    } // of namespace ipaaca