Skip to content
Snippets Groups Projects
ipaaca.cc 53.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • 		for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
    			_payload._remotely_enforced_setitem(it->first, it->second);
    		}
    	} else {
    		_payload._remotely_enforced_wipe();
    		for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
    			_payload._remotely_enforced_setitem(it->first, it->second);
    		}
    	}
    }
    void RemotePushIU::_apply_commission()
    {
    
    Ramin Yaghoubzadeh's avatar
    Ramin Yaghoubzadeh committed
    void RemotePushIU::_apply_retraction()
    {
    	_retracted = true;
    }
    
    //}}}
    
    // RemoteMessage//{{{
    
    RemoteMessage::ptr RemoteMessage::create()
    
    	RemoteMessage::ptr iu = RemoteMessage::ptr(new RemoteMessage(/* params */));
    	iu->_payload.initialize(iu);
    	return iu;
    
    RemoteMessage::RemoteMessage()
    
    void RemoteMessage::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
    
    	IPAACA_INFO("Info: modifying a RemoteMessage only has local effects")
    }
    void RemoteMessage::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
    {
    	IPAACA_INFO("Info: modifying a RemoteMessage only has local effects")
    }
    void RemoteMessage::commit()
    {
    	IPAACA_INFO("Info: committing to a RemoteMessage only has local effects")
    
    void RemoteMessage::_apply_link_update(IULinkUpdate::ptr update)
    {
    	IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_link_update")
    	_revision = update->revision;
    	if (update->is_delta) {
    		_add_and_remove_links(update->new_links, update->links_to_remove);
    	} else {
    		_replace_links(update->new_links);
    	}
    }
    void RemoteMessage::_apply_update(IUPayloadUpdate::ptr update)
    {
    	IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_update")
    	_revision = update->revision;
    	if (update->is_delta) {
    		for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) {
    			_payload._remotely_enforced_delitem(*it);
    		}
    		for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
    			_payload._remotely_enforced_setitem(it->first, it->second);
    		}
    	} else {
    		_payload._remotely_enforced_wipe();
    		for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
    			_payload._remotely_enforced_setitem(it->first, it->second);
    		}
    	}
    }
    void RemoteMessage::_apply_commission()
    {
    	IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_commission")
    	_committed = true;
    }
    void RemoteMessage::_apply_retraction()
    {
    	IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_retraction")
    	_retracted = true;
    }
    
    // PayloadEntryProxy//{{{
    
    PayloadEntryProxy::PayloadEntryProxy(Payload* payload, const std::string& key)
    : _payload(payload), _key(key)
    {
    }
    PayloadEntryProxy& PayloadEntryProxy::operator=(const std::string& value)
    {
    
    	//std::cout << "operator=(string)" << std::endl;
    
    	_payload->set(_key, value);
    	return *this;
    }
    
    PayloadEntryProxy& PayloadEntryProxy::operator=(const char* value)
    {
    
    	//std::cout << "operator=(const char*)" << std::endl;
    
    	_payload->set(_key, value);
    	return *this;
    }
    PayloadEntryProxy& PayloadEntryProxy::operator=(double value)
    {
    
    	//std::cout << "operator=(double)" << std::endl;
    
    	_payload->set(_key, boost::lexical_cast<std::string>(value));
    	return *this;
    }
    PayloadEntryProxy& PayloadEntryProxy::operator=(bool value)
    {
    
    	//std::cout << "operator=(bool)" << std::endl;
    
    	_payload->set(_key, boost::lexical_cast<std::string>(value));
    	return *this;
    }
    
    PayloadEntryProxy::operator std::string()
    {
    	return _payload->get(_key);
    }
    
    PayloadEntryProxy::operator bool()
    {
    	std::string s = operator std::string();
    	return ((s=="1")||(s=="true")||(s=="True"));
    }
    
    PayloadEntryProxy::operator long()
    {
    
    	//return boost::lexical_cast<long>(operator std::string().c_str());
    	return atof(operator std::string().c_str());
    
    }
    PayloadEntryProxy::operator double()
    {
    
    	//return boost::lexical_cast<double>(operator std::string().c_str());
    	return atol(operator std::string().c_str());
    
    }
    //}}}
    
    // Payload//{{{
    
    void Payload::initialize(boost::shared_ptr<IUInterface> iu)
    {
    	_iu = iu;
    }
    
    PayloadEntryProxy Payload::operator[](const std::string& key)
    {
    	//boost::shared_ptr<PayloadEntryProxy> p(new PayloadEntryProxy(this, key));
    	return PayloadEntryProxy(this, key);
    }
    
    Payload::operator std::map<std::string, std::string>()
    {
    	return _store;
    }
    
    inline void Payload::_internal_set(const std::string& k, const std::string& v, const std::string& writer_name) {
    
    	std::map<std::string, std::string> _new;
    	std::vector<std::string> _remove;
    	_new[k]=v;
    
    	_iu->_modify_payload(true, _new, _remove, writer_name );
    
    inline void Payload::_internal_remove(const std::string& k, const std::string& writer_name) {
    
    	std::map<std::string, std::string> _new;
    	std::vector<std::string> _remove;
    	_remove.push_back(k);
    
    	_iu->_modify_payload(true, _new, _remove, writer_name );
    
    void Payload::_internal_replace_all(const std::map<std::string, std::string>& new_contents, const std::string& writer_name)
    {
    	std::vector<std::string> _remove;
    	_iu->_modify_payload(false, new_contents, _remove, writer_name );
    	_store = new_contents;
    }
    
    inline std::string Payload::get(const std::string& k) {
    	if (_store.count(k)>0) return _store[k];
    	else return IPAACA_PAYLOAD_DEFAULT_STRING_VALUE;
    }
    
    void Payload::_remotely_enforced_wipe()
    {
    	_store.clear();
    }
    void Payload::_remotely_enforced_delitem(const std::string& k)
    {
    	_store.erase(k);
    }
    void Payload::_remotely_enforced_setitem(const std::string& k, const std::string& v)
    {
    	_store[k] = v;
    }
    
    
    //}}}
    
    // IUConverter//{{{
    
    IUConverter::IUConverter()
    : Converter<std::string> ("ipaaca::IU", "ipaaca-iu", true)
    {
    }
    
    std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire)
    {
    	// Ensure that DATA actually holds a datum of the data-type we expect.
    	assert(data.first == getDataType()); // "ipaaca::IU"
    	// NOTE: a dynamic_pointer_cast cannot be used from void*
    	boost::shared_ptr<const IU> obj = boost::static_pointer_cast<const IU> (data.second);
    	boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
    	// transfer obj data to pbo
    
    	pbo->set_uid(obj->uid());
    	pbo->set_revision(obj->revision());
    	pbo->set_category(obj->category());
    	pbo->set_payload_type(obj->payload_type());
    	pbo->set_owner_name(obj->owner_name());
    	pbo->set_committed(obj->committed());
    
    	ipaaca::protobuf::IU_AccessMode a_m;
    	switch(obj->access_mode()) {
    		case IU_ACCESS_PUSH:
    			a_m = ipaaca::protobuf::IU_AccessMode_PUSH;
    			break;
    		case IU_ACCESS_REMOTE:
    			a_m = ipaaca::protobuf::IU_AccessMode_REMOTE;
    			break;
    		case IU_ACCESS_MESSAGE:
    			a_m = ipaaca::protobuf::IU_AccessMode_MESSAGE;
    			break;
    	}
    	pbo->set_access_mode(a_m);
    
    	pbo->set_read_only(obj->read_only());
    	for (std::map<std::string, std::string>::const_iterator it=obj->_payload._store.begin(); it!=obj->_payload._store.end(); ++it) {
    		protobuf::PayloadItem* item = pbo->add_payload();
    
    		item->set_key(it->first);
    		item->set_value(it->second);
    
    		item->set_type("str"); // FIXME other types than str (later)
    
    	for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) {
    		protobuf::LinkSet* links = pbo->add_links();
    		links->set_type(it->first);
    		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
    			links->add_targets(*it2);
    		}
    
    	}
    	pbo->SerializeToString(&wire);
    	return getWireSchema();
    
    }
    
    AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
    
    	assert(wireSchema == getWireSchema()); // "ipaaca-iu"
    
    	boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
    	pbo->ParseFromString(wire);
    
    	IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode());
    	switch(mode) {
    		case IU_ACCESS_PUSH:
    			{
    			// Create a "remote push IU"
    
    			boost::shared_ptr<RemotePushIU> obj = RemotePushIU::create();
    
    			// transfer pbo data to obj
    			obj->_uid = pbo->uid();
    			obj->_revision = pbo->revision();
    			obj->_category = pbo->category();
    			obj->_payload_type = pbo->payload_type();
    			obj->_owner_name = pbo->owner_name();
    			obj->_committed = pbo->committed();
    			obj->_read_only = pbo->read_only();
    			obj->_access_mode = IU_ACCESS_PUSH;
    			for (int i=0; i<pbo->payload_size(); i++) {
    				const protobuf::PayloadItem& it = pbo->payload(i);
    				obj->_payload._store[it.key()] = it.value();
    			}
    			for (int i=0; i<pbo->links_size(); i++) {
    				const protobuf::LinkSet& pls = pbo->links(i);
    				LinkSet& ls = obj->_links._links[pls.type()];
    				for (int j=0; j<pls.targets_size(); j++) {
    					ls.insert(pls.targets(j));
    				}
    			}
    
    			//return std::make_pair(getDataType(), obj);
    			return std::make_pair("ipaaca::RemotePushIU", obj);
    
    		/*case IU_ACCESS_MESSAGE:
    			{
    			// Create a "Message-type IU"
    			boost::shared_ptr<RemoteMessage> obj = RemoteMessage::create();
    			// transfer pbo data to obj
    			obj->_uid = pbo->uid();
    			obj->_revision = pbo->revision();
    			obj->_category = pbo->category();
    			obj->_payload_type = pbo->payload_type();
    			obj->_owner_name = pbo->owner_name();
    			obj->_committed = pbo->committed();
    			obj->_read_only = pbo->read_only();
    			obj->_access_mode = IU_ACCESS_MESSAGE;
    			for (int i=0; i<pbo->payload_size(); i++) {
    				const protobuf::PayloadItem& it = pbo->payload(i);
    				obj->_payload._store[it.key()] = it.value();
    			}
    			for (int i=0; i<pbo->links_size(); i++) {
    				const protobuf::LinkSet& pls = pbo->links(i);
    				LinkSet& ls = obj->_links._links[pls.type()];
    				for (int j=0; j<pls.targets_size(); j++) {
    					ls.insert(pls.targets(j));
    				}
    			}
    			//return std::make_pair(getDataType(), obj);
    			return std::make_pair("ipaaca::RemoteMessage", obj);
    			break;
    			} */
    		default:
    			// other cases not handled yet! ( TODO )
    			throw NotImplementedError();
    	}
    }
    
    //}}}
    // MessageConverter//{{{
    
    MessageConverter::MessageConverter()
    : Converter<std::string> ("ipaaca::Message", "ipaaca-messageiu", true)
    {
    }
    
    std::string MessageConverter::serialize(const AnnotatedData& data, std::string& wire)
    {
    	// Ensure that DATA actually holds a datum of the data-type we expect.
    	assert(data.first == getDataType()); // "ipaaca::Message"
    	// NOTE: a dynamic_pointer_cast cannot be used from void*
    	boost::shared_ptr<const Message> obj = boost::static_pointer_cast<const Message> (data.second);
    	boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
    	// transfer obj data to pbo
    	pbo->set_uid(obj->uid());
    	pbo->set_revision(obj->revision());
    	pbo->set_category(obj->category());
    	pbo->set_payload_type(obj->payload_type());
    	pbo->set_owner_name(obj->owner_name());
    	pbo->set_committed(obj->committed());
    	ipaaca::protobuf::IU_AccessMode a_m;
    	switch(obj->access_mode()) {
    		case IU_ACCESS_PUSH:
    			a_m = ipaaca::protobuf::IU_AccessMode_PUSH;
    			break;
    		case IU_ACCESS_REMOTE:
    			a_m = ipaaca::protobuf::IU_AccessMode_REMOTE;
    			break;
    		case IU_ACCESS_MESSAGE:
    			a_m = ipaaca::protobuf::IU_AccessMode_MESSAGE;
    			break;
    	}
    	pbo->set_access_mode(a_m);
    	pbo->set_read_only(obj->read_only());
    	for (std::map<std::string, std::string>::const_iterator it=obj->_payload._store.begin(); it!=obj->_payload._store.end(); ++it) {
    		protobuf::PayloadItem* item = pbo->add_payload();
    		item->set_key(it->first);
    		item->set_value(it->second);
    		item->set_type("str"); // FIXME other types than str (later)
    	}
    	for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) {
    		protobuf::LinkSet* links = pbo->add_links();
    		links->set_type(it->first);
    		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
    			links->add_targets(*it2);
    		}
    	}
    	pbo->SerializeToString(&wire);
    	return getWireSchema();
    
    }
    
    AnnotatedData MessageConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
    	assert(wireSchema == getWireSchema()); // "ipaaca-iu"
    	boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
    	pbo->ParseFromString(wire);
    	IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode());
    	switch(mode) {
    
    		case IU_ACCESS_MESSAGE:
    			{
    			// Create a "Message-type IU"
    			boost::shared_ptr<RemoteMessage> obj = RemoteMessage::create();
    			// transfer pbo data to obj
    			obj->_uid = pbo->uid();
    			obj->_revision = pbo->revision();
    			obj->_category = pbo->category();
    			obj->_payload_type = pbo->payload_type();
    			obj->_owner_name = pbo->owner_name();
    			obj->_committed = pbo->committed();
    			obj->_read_only = pbo->read_only();
    			obj->_access_mode = IU_ACCESS_MESSAGE;
    			for (int i=0; i<pbo->payload_size(); i++) {
    				const protobuf::PayloadItem& it = pbo->payload(i);
    				obj->_payload._store[it.key()] = it.value();
    			}
    			for (int i=0; i<pbo->links_size(); i++) {
    				const protobuf::LinkSet& pls = pbo->links(i);
    				LinkSet& ls = obj->_links._links[pls.type()];
    				for (int j=0; j<pls.targets_size(); j++) {
    					ls.insert(pls.targets(j));
    				}
    			}
    			//return std::make_pair(getDataType(), obj);
    			return std::make_pair("ipaaca::RemoteMessage", obj);
    			break;
    			}
    
    		default:
    			// other cases not handled yet! ( TODO )
    			throw NotImplementedError();
    
    	}
    }
    
    //}}}
    
    // IUPayloadUpdateConverter//{{{
    
    IUPayloadUpdateConverter::IUPayloadUpdateConverter()
    : Converter<std::string> ("ipaaca::IUPayloadUpdate", "ipaaca-iu-payload-update", true)
    {
    }
    
    std::string IUPayloadUpdateConverter::serialize(const AnnotatedData& data, std::string& wire)
    {
    	assert(data.first == getDataType()); // "ipaaca::IUPayloadUpdate"
    	boost::shared_ptr<const IUPayloadUpdate> obj = boost::static_pointer_cast<const IUPayloadUpdate> (data.second);
    	boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
    	// transfer obj data to pbo
    	pbo->set_uid(obj->uid);
    	pbo->set_revision(obj->revision);
    	pbo->set_writer_name(obj->writer_name);
    	pbo->set_is_delta(obj->is_delta);
    	for (std::map<std::string, std::string>::const_iterator it=obj->new_items.begin(); it!=obj->new_items.end(); ++it) {
    		protobuf::PayloadItem* item = pbo->add_new_items();
    		item->set_key(it->first);
    		item->set_value(it->second);
    		item->set_type("str"); // FIXME other types than str (later)
    	}
    	for (std::vector<std::string>::const_iterator it=obj->keys_to_remove.begin(); it!=obj->keys_to_remove.end(); ++it) {
    		pbo->add_keys_to_remove(*it);
    	}
    	pbo->SerializeToString(&wire);
    	return getWireSchema();
    
    }
    
    AnnotatedData IUPayloadUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
    	assert(wireSchema == getWireSchema()); // "ipaaca-iu-payload-update"
    	boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
    	pbo->ParseFromString(wire);
    	boost::shared_ptr<IUPayloadUpdate> obj(new IUPayloadUpdate());
    	// transfer pbo data to obj
    	obj->uid = pbo->uid();
    	obj->revision = pbo->revision();
    	obj->writer_name = pbo->writer_name();
    	obj->is_delta = pbo->is_delta();
    	for (int i=0; i<pbo->new_items_size(); i++) {
    		const protobuf::PayloadItem& it = pbo->new_items(i);
    		obj->new_items[it.key()] = it.value();
    	}
    	for (int i=0; i<pbo->keys_to_remove_size(); i++) {
    		obj->keys_to_remove.push_back(pbo->keys_to_remove(i));
    	}
    	return std::make_pair(getDataType(), obj);
    }
    
    //}}}
    
    // IULinkUpdateConverter//{{{
    
    IULinkUpdateConverter::IULinkUpdateConverter()
    : Converter<std::string> ("ipaaca::IULinkUpdate", "ipaaca-iu-link-update", true)
    {
    }
    
    std::string IULinkUpdateConverter::serialize(const AnnotatedData& data, std::string& wire)
    {
    	assert(data.first == getDataType()); // "ipaaca::IULinkUpdate"
    	boost::shared_ptr<const IULinkUpdate> obj = boost::static_pointer_cast<const IULinkUpdate> (data.second);
    	boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
    	// transfer obj data to pbo
    	pbo->set_uid(obj->uid);
    	pbo->set_revision(obj->revision);
    	pbo->set_writer_name(obj->writer_name);
    	pbo->set_is_delta(obj->is_delta);
    	for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->new_links.begin(); it!=obj->new_links.end(); ++it) {
    		protobuf::LinkSet* links = pbo->add_new_links();
    		links->set_type(it->first);
    		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
    			links->add_targets(*it2);
    		}
    	}
    	for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->links_to_remove.begin(); it!=obj->links_to_remove.end(); ++it) {
    		protobuf::LinkSet* links = pbo->add_links_to_remove();
    		links->set_type(it->first);
    		for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
    			links->add_targets(*it2);
    		}
    	}
    	pbo->SerializeToString(&wire);
    	return getWireSchema();
    
    }
    
    AnnotatedData IULinkUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
    	assert(wireSchema == getWireSchema()); // "ipaaca-iu-link-update"
    	boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
    	pbo->ParseFromString(wire);
    	boost::shared_ptr<IULinkUpdate> obj(new IULinkUpdate());
    	// transfer pbo data to obj
    	obj->uid = pbo->uid();
    	obj->revision = pbo->revision();
    	obj->writer_name = pbo->writer_name();
    	obj->is_delta = pbo->is_delta();
    	for (int i=0; i<pbo->new_links_size(); ++i) {
    		const protobuf::LinkSet& it = pbo->new_links(i);
    		for (int j=0; j<it.targets_size(); ++j) {
    			obj->new_links[it.type()].insert(it.targets(j)); // = vec;
    		}
    	}
    	for (int i=0; i<pbo->links_to_remove_size(); ++i) {
    		const protobuf::LinkSet& it = pbo->links_to_remove(i);
    		for (int j=0; j<it.targets_size(); ++j) {
    			obj->links_to_remove[it.type()].insert(it.targets(j));
    		}
    	}
    	return std::make_pair(getDataType(), obj);
    }
    
    //}}}
    
    
    // IntConverter//{{{
    
    IntConverter::IntConverter()
    
    {
    }
    
    std::string IntConverter::serialize(const AnnotatedData& data, std::string& wire)
    {
    	// Ensure that DATA actually holds a datum of the data-type we expect.
    	assert(data.first == getDataType()); // "int"
    	// NOTE: a dynamic_pointer_cast cannot be used from void*
    	boost::shared_ptr<const int> obj = boost::static_pointer_cast<const int> (data.second);
    	boost::shared_ptr<protobuf::IntMessage> pbo(new protobuf::IntMessage());
    	// transfer obj data to pbo
    	pbo->set_value(*obj);
    	pbo->SerializeToString(&wire);
    	return getWireSchema();
    
    }
    
    AnnotatedData IntConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
    	assert(wireSchema == getWireSchema()); // "int"
    	boost::shared_ptr<protobuf::IntMessage> pbo(new protobuf::IntMessage());
    	pbo->ParseFromString(wire);
    	boost::shared_ptr<int> obj = boost::shared_ptr<int>(new int(pbo->value()));
    	return std::make_pair("int", obj);
    }
    
    //}}}