Skip to content
Snippets Groups Projects
ipaaca-internal.cc 21.5 KiB
Newer Older
  • Learn to ignore specific revisions
  • /*
     * This file is part of IPAACA, the
     *  "Incremental Processing Architecture
     *   for Artificial Conversational Agents".
     *
     * Copyright (c) 2009-2015 Social Cognitive Systems Group
     *                         (formerly the Sociable Agents Group)
     *                         CITEC, Bielefeld University
     *
     * http://opensource.cit-ec.de/projects/ipaaca/
     * http://purl.org/net/ipaaca
     *
     * This file may be licensed under the terms of of the
     * GNU Lesser General Public License Version 3 (the ``LGPL''),
     * or (at your option) any later version.
     *
     * Software distributed under the License is distributed
     * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
     * express or implied. See the LGPL for the specific language
     * governing rights and limitations.
     *
     * You should have received a copy of the LGPL along with this
     * program. If not, go to http://www.gnu.org/licenses/lgpl.html
     * or write to the Free Software Foundation, Inc.,
     * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
     *
     * The development of this software was supported by the
     * Excellence Cluster EXC 277 Cognitive Interaction Technology.
     * The Excellence Cluster EXC 277 is a grant of the Deutsche
     * Forschungsgemeinschaft (DFG) in the context of the German
     * Excellence Initiative.
     */
    
    #include <ipaaca/ipaaca.h>
    
    namespace ipaaca {
    
    using namespace rsb;
    using namespace rsb::filter;
    using namespace rsb::converter;
    using namespace rsb::patterns;
    
    // static library Initializer
    IPAACA_EXPORT bool Initializer::_initialized = false;
    IPAACA_EXPORT bool Initializer::initialized() { return _initialized; }
    
    IPAACA_EXPORT void Initializer::initialize_ipaaca_rsb_if_needed()
    {
    	initialize_backend();
    }
    IPAACA_EXPORT void Initializer::initialize_backend()//{{{
    
    	auto_configure_rsb();
    
    	IPAACA_DEBUG("Creating and registering Converters")
    
    	boost::shared_ptr<IUConverter> iu_converter(new IUConverter());
    	converterRepository<std::string>()->registerConverter(iu_converter);
    
    	boost::shared_ptr<MessageConverter> message_converter(new MessageConverter());
    	converterRepository<std::string>()->registerConverter(message_converter);
    
    	boost::shared_ptr<IUPayloadUpdateConverter> payload_update_converter(new IUPayloadUpdateConverter());
    	converterRepository<std::string>()->registerConverter(payload_update_converter);
    
    	boost::shared_ptr<IULinkUpdateConverter> link_update_converter(new IULinkUpdateConverter());
    	converterRepository<std::string>()->registerConverter(link_update_converter);
    
    	boost::shared_ptr<ProtocolBufferConverter<protobuf::IUCommission> > iu_commission_converter(new ProtocolBufferConverter<protobuf::IUCommission> ());
    	converterRepository<std::string>()->registerConverter(iu_commission_converter);
    
    	// dlw
    	boost::shared_ptr<ProtocolBufferConverter<protobuf::IUResendRequest> > iu_resendrequest_converter(new ProtocolBufferConverter<protobuf::IUResendRequest> ());
    	converterRepository<std::string>()->registerConverter(iu_resendrequest_converter);
    
    	boost::shared_ptr<ProtocolBufferConverter<protobuf::IURetraction> > iu_retraction_converter(new ProtocolBufferConverter<protobuf::IURetraction> ());
    	converterRepository<std::string>()->registerConverter(iu_retraction_converter);
    
    	boost::shared_ptr<IntConverter> int_converter(new IntConverter());
    	converterRepository<std::string>()->registerConverter(int_converter);
    
    
    	IPAACA_DEBUG("Backend / converter initialization complete.")
    
    	_initialized = true;
    }//}}}
    IPAACA_EXPORT void Initializer::dump_current_default_config()//{{{
    {
    	IPAACA_INFO("--- Dumping current default participant configuration ---")
    	rsb::ParticipantConfig config = getFactory().getDefaultParticipantConfig();
    	std::set<rsb::ParticipantConfig::Transport> transports = config.getTransports();
    	for (std::set<rsb::ParticipantConfig::Transport>::const_iterator it=transports.begin(); it!=transports.end(); ++it) {
    		IPAACA_INFO( "Active transport: " << it->getName() )
    	}
    	IPAACA_INFO("--- End of configuration dump ---")
    	//ParticipantConfig::Transport inprocess = config.getTransport("inprocess");
    	//inprocess.setEnabled(true);
    	//config.addTransport(inprocess);
    }//}}}
    
    IPAACA_EXPORT void Initializer::auto_configure_rsb()//{{{
    
    {
    	const char* plugin_path = getenv("RSB_PLUGINS_CPP_PATH");
    	if (!plugin_path) {
    #ifdef WIN32
    
    		IPAACA_WARN("WARNING: RSB_PLUGINS_CPP_PATH not set - in Windows it has to be specified.")
    
    		//throw NotImplementedError();
    #else
    
    		IPAACA_INFO("RSB_PLUGINS_CPP_PATH not set; looking here and up to 7 dirs up.")
    
    		std::string pathstr = "./";
    		for (int i=0; i<   8 /* depth EIGHT (totally arbitrary..) */  ; i++) {
    			std::string where_str = pathstr+"deps/lib/rsb*/plugins";
    			const char* where = where_str.c_str();
    			glob_t g;
    			glob(where, 0, NULL, &g);
    			if (g.gl_pathc>0) {
    				const char* found_path = g.gl_pathv[0];
    
    				IPAACA_INFO("Found an RSB plugin dir which will be used automatically: " << found_path)
    
    				setenv("RSB_PLUGINS_CPP_PATH", found_path, 1);
    				break;
    			} // else keep going
    			globfree(&g);
    			pathstr += "../";
    		}
    #endif
    	} else {
    
    		IPAACA_INFO("RSB_PLUGINS_CPP_PATH already defined: " << plugin_path)
    
    	}
    }//}}}
    
    // RSB backend Converters
    // IUConverter//{{{
    
    IPAACA_EXPORT IUConverter::IUConverter()
    : Converter<std::string> (IPAACA_SYSTEM_DEPENDENT_CLASS_NAME("ipaaca::IU"), "ipaaca-iu", true)
    {
    }
    
    IPAACA_EXPORT std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire)
    {
    	assert(data.first == getDataType()); // "ipaaca::IU"
    	boost::shared_ptr<const IU> obj = boost::static_pointer_cast<const IU> (data.second);
    	boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
    	// transfer obj data to pbo
    	pbo->set_uid(obj->uid());
    	pbo->set_revision(obj->revision());
    	pbo->set_category(obj->category());
    	pbo->set_payload_type(obj->payload_type());
    	pbo->set_owner_name(obj->owner_name());
    	pbo->set_committed(obj->committed());
    	ipaaca::protobuf::IU_AccessMode a_m;
    	switch(obj->access_mode()) {
    		case IU_ACCESS_PUSH:
    			a_m = ipaaca::protobuf::IU_AccessMode_PUSH;
    			break;
    		case IU_ACCESS_REMOTE:
    			a_m = ipaaca::protobuf::IU_AccessMode_REMOTE;
    			break;
    		case IU_ACCESS_MESSAGE:
    			a_m = ipaaca::protobuf::IU_AccessMode_MESSAGE;
    			break;
    	}
    	pbo->set_access_mode(a_m);
    	pbo->set_read_only(obj->read_only());
    
    	for (auto& kv: obj->_payload._document_store) {
    
    		protobuf::PayloadItem* item = pbo->add_payload();
    
    		item->set_key(kv.first);
    
    		IPAACA_DEBUG("Payload type: " << obj->_payload_type)
    
    		if (obj->_payload_type=="JSON") {
    			item->set_value( kv.second->to_json_string_representation() );
    			item->set_type("JSON");
    		} else if ((obj->_payload_type=="MAP") || (obj->_payload_type=="STR")) {
    			// legacy mode
    			item->set_value( json_value_cast<std::string>(kv.second->document));
    			item->set_type("STR");
    		}
    
    	}
    	for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) {
    		protobuf::LinkSet* links = pbo->add_links();
    		links->set_type(it->first);
    		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
    			links->add_targets(*it2);
    		}
    	}
    	pbo->SerializeToString(&wire);
    	switch(obj->access_mode()) {
    		case IU_ACCESS_PUSH:
    			return "ipaaca-iu";
    		case IU_ACCESS_MESSAGE:
    			return "ipaaca-messageiu";
    		default:
    			return getWireSchema();
    	}
    
    }
    
    IPAACA_EXPORT AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
    	assert(wireSchema == getWireSchema()); // "ipaaca-iu"
    	boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
    	pbo->ParseFromString(wire);
    	IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode());
    	switch(mode) {
    		case IU_ACCESS_PUSH:
    			{
    			// Create a "remote push IU"
    			boost::shared_ptr<RemotePushIU> obj = RemotePushIU::create();
    			// transfer pbo data to obj
    			obj->_uid = pbo->uid();
    			obj->_revision = pbo->revision();
    			obj->_category = pbo->category();
    			obj->_payload_type = pbo->payload_type();
    			obj->_owner_name = pbo->owner_name();
    			obj->_committed = pbo->committed();
    			obj->_read_only = pbo->read_only();
    			obj->_access_mode = IU_ACCESS_PUSH;
    			for (int i=0; i<pbo->payload_size(); i++) {
    				const protobuf::PayloadItem& it = pbo->payload(i);
    
    				PayloadDocumentEntry::ptr entry;
    
    				if (it.type() == "JSON") {
    
    					// fully parse json text
    
    					entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
    
    					// assuming legacy "str" -> just copy value to raw string in document
    
    					entry = std::make_shared<PayloadDocumentEntry>();
    					entry->document.SetString(it.value(), entry->document.GetAllocator());
    				}
    				obj->_payload._document_store[it.key()] = entry;
    
    			}
    			for (int i=0; i<pbo->links_size(); i++) {
    				const protobuf::LinkSet& pls = pbo->links(i);
    				LinkSet& ls = obj->_links._links[pls.type()];
    				for (int j=0; j<pls.targets_size(); j++) {
    					ls.insert(pls.targets(j));
    				}
    			}
    			return std::make_pair("ipaaca::RemotePushIU", obj);
    			break;
    			}
    		case IU_ACCESS_MESSAGE:
    			{
    			// Create a "Message-type IU"
    			boost::shared_ptr<RemoteMessage> obj = RemoteMessage::create();
    			//std::cout << "REFCNT after create: " << obj.use_count() << std::endl;
    			// transfer pbo data to obj
    			obj->_uid = pbo->uid();
    			obj->_revision = pbo->revision();
    			obj->_category = pbo->category();
    			obj->_payload_type = pbo->payload_type();
    			obj->_owner_name = pbo->owner_name();
    			obj->_committed = pbo->committed();
    			obj->_read_only = pbo->read_only();
    			obj->_access_mode = IU_ACCESS_MESSAGE;
    			for (int i=0; i<pbo->payload_size(); i++) {
    				const protobuf::PayloadItem& it = pbo->payload(i);
    
    				PayloadDocumentEntry::ptr entry;
    
    				if (it.type() == "JSON") {
    
    					// fully parse json text
    
    					entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
    
    					// assuming legacy "str" -> just copy value to raw string in document
    
    					entry = std::make_shared<PayloadDocumentEntry>();
    					entry->document.SetString(it.value(), entry->document.GetAllocator());
    				}
    				obj->_payload._document_store[it.key()] = entry;
    
    			}
    			for (int i=0; i<pbo->links_size(); i++) {
    				const protobuf::LinkSet& pls = pbo->links(i);
    				LinkSet& ls = obj->_links._links[pls.type()];
    				for (int j=0; j<pls.targets_size(); j++) {
    					ls.insert(pls.targets(j));
    				}
    			}
    			return std::make_pair("ipaaca::RemoteMessage", obj);
    			break;
    			}
    		default:
    
    			throw NotImplementedError();
    	}
    }
    
    //}}}
    // MessageConverter//{{{
    
    IPAACA_EXPORT MessageConverter::MessageConverter()
    : Converter<std::string> (IPAACA_SYSTEM_DEPENDENT_CLASS_NAME("ipaaca::Message"), "ipaaca-messageiu", true)
    {
    }
    
    IPAACA_EXPORT std::string MessageConverter::serialize(const AnnotatedData& data, std::string& wire)
    {
    	assert(data.first == getDataType()); // "ipaaca::Message"
    	boost::shared_ptr<const Message> obj = boost::static_pointer_cast<const Message> (data.second);
    	boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
    	// transfer obj data to pbo
    	pbo->set_uid(obj->uid());
    	pbo->set_revision(obj->revision());
    	pbo->set_category(obj->category());
    	pbo->set_payload_type(obj->payload_type());
    	pbo->set_owner_name(obj->owner_name());
    	pbo->set_committed(obj->committed());
    	ipaaca::protobuf::IU_AccessMode a_m;
    	switch(obj->access_mode()) {
    		case IU_ACCESS_PUSH:
    			a_m = ipaaca::protobuf::IU_AccessMode_PUSH;
    			break;
    		case IU_ACCESS_REMOTE:
    			a_m = ipaaca::protobuf::IU_AccessMode_REMOTE;
    			break;
    		case IU_ACCESS_MESSAGE:
    			a_m = ipaaca::protobuf::IU_AccessMode_MESSAGE;
    			break;
    	}
    	pbo->set_access_mode(a_m);
    	pbo->set_read_only(obj->read_only());
    
    	for (auto& kv: obj->_payload._document_store) {
    
    		protobuf::PayloadItem* item = pbo->add_payload();
    
    		item->set_key(kv.first);
    
    		if (obj->_payload_type=="JSON") {
    			item->set_value( kv.second->to_json_string_representation() );
    			item->set_type("JSON");
    		} else if ((obj->_payload_type=="MAP") || (obj->_payload_type=="STR")) {
    			// legacy mode
    			item->set_value( json_value_cast<std::string>(kv.second->document));
    			item->set_type("STR");
    		}
    
    	}
    	for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) {
    		protobuf::LinkSet* links = pbo->add_links();
    		links->set_type(it->first);
    		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
    			links->add_targets(*it2);
    		}
    	}
    	pbo->SerializeToString(&wire);
    	switch(obj->access_mode()) {
    		case IU_ACCESS_PUSH:
    			return "ipaaca-iu";
    		case IU_ACCESS_MESSAGE:
    			return "ipaaca-messageiu";
    		default:
    			return getWireSchema();
    	}
    
    }
    
    IPAACA_EXPORT AnnotatedData MessageConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
    	assert(wireSchema == getWireSchema()); // "ipaaca-iu"
    	boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
    	pbo->ParseFromString(wire);
    	IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode());
    	switch(mode) {
    		case IU_ACCESS_PUSH:
    			{
    			// Create a "remote push IU"
    			boost::shared_ptr<RemotePushIU> obj = RemotePushIU::create();
    			// transfer pbo data to obj
    			obj->_uid = pbo->uid();
    			obj->_revision = pbo->revision();
    			obj->_category = pbo->category();
    			obj->_payload_type = pbo->payload_type();
    			obj->_owner_name = pbo->owner_name();
    			obj->_committed = pbo->committed();
    			obj->_read_only = pbo->read_only();
    			obj->_access_mode = IU_ACCESS_PUSH;
    			for (int i=0; i<pbo->payload_size(); i++) {
    				const protobuf::PayloadItem& it = pbo->payload(i);
    
    				PayloadDocumentEntry::ptr entry;
    
    				if (it.type() == "JSON") {
    
    					// fully parse json text
    
    					entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
    
    					// assuming legacy "str" -> just copy value to raw string in document
    
    					entry = std::make_shared<PayloadDocumentEntry>();
    					entry->document.SetString(it.value(), entry->document.GetAllocator());
    				}
    				obj->_payload._document_store[it.key()] = entry;
    
    			}
    			for (int i=0; i<pbo->links_size(); i++) {
    				const protobuf::LinkSet& pls = pbo->links(i);
    				LinkSet& ls = obj->_links._links[pls.type()];
    				for (int j=0; j<pls.targets_size(); j++) {
    					ls.insert(pls.targets(j));
    				}
    			}
    			return std::make_pair("ipaaca::RemotePushIU", obj);
    			break;
    			}
    		case IU_ACCESS_MESSAGE:
    			{
    			// Create a "Message-type IU"
    			boost::shared_ptr<RemoteMessage> obj = RemoteMessage::create();
    			// transfer pbo data to obj
    			obj->_uid = pbo->uid();
    			obj->_revision = pbo->revision();
    			obj->_category = pbo->category();
    			obj->_payload_type = pbo->payload_type();
    			obj->_owner_name = pbo->owner_name();
    			obj->_committed = pbo->committed();
    			obj->_read_only = pbo->read_only();
    			obj->_access_mode = IU_ACCESS_MESSAGE;
    			for (int i=0; i<pbo->payload_size(); i++) {
    				const protobuf::PayloadItem& it = pbo->payload(i);
    
    				PayloadDocumentEntry::ptr entry;
    
    				if (it.type() == "JSON") {
    
    					// fully parse json text
    
    					entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
    
    					// assuming legacy "str" -> just copy value to raw string in document
    
    					entry = std::make_shared<PayloadDocumentEntry>();
    					entry->document.SetString(it.value(), entry->document.GetAllocator());
    				}
    				obj->_payload._document_store[it.key()] = entry;
    
    			}
    			for (int i=0; i<pbo->links_size(); i++) {
    				const protobuf::LinkSet& pls = pbo->links(i);
    				LinkSet& ls = obj->_links._links[pls.type()];
    				for (int j=0; j<pls.targets_size(); j++) {
    					ls.insert(pls.targets(j));
    				}
    			}
    			return std::make_pair("ipaaca::RemoteMessage", obj);
    			break;
    			}
    		default:
    
    			throw NotImplementedError();
    	}
    }
    
    //}}}
    // IUPayloadUpdateConverter//{{{
    
    IPAACA_EXPORT IUPayloadUpdateConverter::IUPayloadUpdateConverter()
    : Converter<std::string> (IPAACA_SYSTEM_DEPENDENT_CLASS_NAME("ipaaca::IUPayloadUpdate"), "ipaaca-iu-payload-update", true)
    {
    }
    
    IPAACA_EXPORT std::string IUPayloadUpdateConverter::serialize(const AnnotatedData& data, std::string& wire)
    {
    	assert(data.first == getDataType()); // "ipaaca::IUPayloadUpdate"
    	boost::shared_ptr<const IUPayloadUpdate> obj = boost::static_pointer_cast<const IUPayloadUpdate> (data.second);
    	boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
    	// transfer obj data to pbo
    	pbo->set_uid(obj->uid);
    	pbo->set_revision(obj->revision);
    	pbo->set_writer_name(obj->writer_name);
    	pbo->set_is_delta(obj->is_delta);
    
    	for (auto& kv: obj->new_items) {
    
    		protobuf::PayloadItem* item = pbo->add_new_items();
    
    		item->set_key(kv.first);
    
    		if (obj->payload_type=="JSON") {
    			item->set_value( kv.second->to_json_string_representation() );
    			item->set_type("JSON");
    		} else if ((obj->payload_type=="MAP") || (obj->payload_type=="STR")) {
    			// legacy mode
    			item->set_value( json_value_cast<std::string>(kv.second->document));
    			item->set_type("STR");
    
    		} else {
    			IPAACA_ERROR("Uninitialized payload update type!")
    			throw NotImplementedError();
    
    		IPAACA_DEBUG("Adding updated item (type " << item->type() << "): " << item->key() << " -> " << item->value() )
    
    	for (auto& key: obj->keys_to_remove) {
    		pbo->add_keys_to_remove(key);
    
    		IPAACA_DEBUG("Adding removed key: " << key)
    
    	}
    	pbo->SerializeToString(&wire);
    	return getWireSchema();
    
    }
    
    AnnotatedData IUPayloadUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
    	assert(wireSchema == getWireSchema()); // "ipaaca-iu-payload-update"
    	boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
    	pbo->ParseFromString(wire);
    	boost::shared_ptr<IUPayloadUpdate> obj(new IUPayloadUpdate());
    	// transfer pbo data to obj
    	obj->uid = pbo->uid();
    	obj->revision = pbo->revision();
    	obj->writer_name = pbo->writer_name();
    	obj->is_delta = pbo->is_delta();
    	for (int i=0; i<pbo->new_items_size(); i++) {
    		const protobuf::PayloadItem& it = pbo->new_items(i);
    
    		PayloadDocumentEntry::ptr entry;
    
    		if (it.type() == "JSON") {
    
    			// fully parse json text
    
    			entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
    			IPAACA_INFO("New/updated payload entry: " << it.key() << " -> " << it.value() )
    
    			// assuming legacy "str" -> just copy value to raw string in document
    
    			entry = std::make_shared<PayloadDocumentEntry>();
    			entry->document.SetString(it.value(), entry->document.GetAllocator());
    		}
    		obj->new_items[it.key()] = entry;
    
    	}
    	for (int i=0; i<pbo->keys_to_remove_size(); i++) {
    		obj->keys_to_remove.push_back(pbo->keys_to_remove(i));
    	}
    	return std::make_pair(getDataType(), obj);
    }
    
    //}}}
    // IULinkUpdateConverter//{{{
    
    IPAACA_EXPORT IULinkUpdateConverter::IULinkUpdateConverter()
    : Converter<std::string> (IPAACA_SYSTEM_DEPENDENT_CLASS_NAME("ipaaca::IULinkUpdate"), "ipaaca-iu-link-update", true)
    {
    }
    
    IPAACA_EXPORT std::string IULinkUpdateConverter::serialize(const AnnotatedData& data, std::string& wire)
    {
    
    	boost::shared_ptr<const IULinkUpdate> obj = boost::static_pointer_cast<const IULinkUpdate> (data.second);
    	boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
    	// transfer obj data to pbo
    	pbo->set_uid(obj->uid);
    	pbo->set_revision(obj->revision);
    	pbo->set_writer_name(obj->writer_name);
    	pbo->set_is_delta(obj->is_delta);
    	for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->new_links.begin(); it!=obj->new_links.end(); ++it) {
    		protobuf::LinkSet* links = pbo->add_new_links();
    		links->set_type(it->first);
    		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
    			links->add_targets(*it2);
    		}
    	}
    	for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->links_to_remove.begin(); it!=obj->links_to_remove.end(); ++it) {
    		protobuf::LinkSet* links = pbo->add_links_to_remove();
    		links->set_type(it->first);
    		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
    			links->add_targets(*it2);
    		}
    	}
    	pbo->SerializeToString(&wire);
    	return getWireSchema();
    
    }
    
    AnnotatedData IULinkUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
    	assert(wireSchema == getWireSchema()); // "ipaaca-iu-link-update"
    	boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
    	pbo->ParseFromString(wire);
    	boost::shared_ptr<IULinkUpdate> obj(new IULinkUpdate());
    	// transfer pbo data to obj
    	obj->uid = pbo->uid();
    	obj->revision = pbo->revision();
    	obj->writer_name = pbo->writer_name();
    	obj->is_delta = pbo->is_delta();
    	for (int i=0; i<pbo->new_links_size(); ++i) {
    		const protobuf::LinkSet& it = pbo->new_links(i);
    		for (int j=0; j<it.targets_size(); ++j) {
    			obj->new_links[it.type()].insert(it.targets(j)); // = vec;
    		}
    	}
    	for (int i=0; i<pbo->links_to_remove_size(); ++i) {
    		const protobuf::LinkSet& it = pbo->links_to_remove(i);
    		for (int j=0; j<it.targets_size(); ++j) {
    			obj->links_to_remove[it.type()].insert(it.targets(j));
    		}
    	}
    	return std::make_pair(getDataType(), obj);
    }
    
    //}}}
    // IntConverter//{{{
    
    IPAACA_EXPORT IntConverter::IntConverter()
    : Converter<std::string> ("int", "int32", true)
    {
    }
    
    IPAACA_EXPORT std::string IntConverter::serialize(const AnnotatedData& data, std::string& wire)
    {
    
    	// NOTE: a dynamic_pointer_cast cannot be used from void*
    	boost::shared_ptr<const int> obj = boost::static_pointer_cast<const int> (data.second);
    	boost::shared_ptr<protobuf::IntMessage> pbo(new protobuf::IntMessage());
    	// transfer obj data to pbo
    	pbo->set_value(*obj);
    	pbo->SerializeToString(&wire);
    	return getWireSchema();
    
    }
    
    IPAACA_EXPORT AnnotatedData IntConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
    	assert(wireSchema == getWireSchema()); // "int"
    	boost::shared_ptr<protobuf::IntMessage> pbo(new protobuf::IntMessage());
    	pbo->ParseFromString(wire);
    	boost::shared_ptr<int> obj = boost::shared_ptr<int>(new int(pbo->value()));
    	return std::make_pair("int", obj);
    }
    
    //}}}
    
    } // of namespace ipaaca