Skip to content
Snippets Groups Projects
ipaaca-ius.cc 12.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • /*
     * This file is part of IPAACA, the
     *  "Incremental Processing Architecture
     *   for Artificial Conversational Agents".
     *
     * Copyright (c) 2009-2015 Social Cognitive Systems Group
     *                         (formerly the Sociable Agents Group)
     *                         CITEC, Bielefeld University
     *
     * http://opensource.cit-ec.de/projects/ipaaca/
     * http://purl.org/net/ipaaca
     *
     * This file may be licensed under the terms of of the
     * GNU Lesser General Public License Version 3 (the ``LGPL''),
     * or (at your option) any later version.
     *
     * Software distributed under the License is distributed
     * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
     * express or implied. See the LGPL for the specific language
     * governing rights and limitations.
     *
     * You should have received a copy of the LGPL along with this
     * program. If not, go to http://www.gnu.org/licenses/lgpl.html
     * or write to the Free Software Foundation, Inc.,
     * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
     *
     * The development of this software was supported by the
     * Excellence Cluster EXC 277 Cognitive Interaction Technology.
     * The Excellence Cluster EXC 277 is a grant of the Deutsche
     * Forschungsgemeinschaft (DFG) in the context of the German
     * Excellence Initiative.
     */
    
    #include <ipaaca/ipaaca.h>
    
    namespace ipaaca {
    
    using namespace rsb;
    using namespace rsb::filter;
    using namespace rsb::converter;
    using namespace rsb::patterns;
    
    // IU//{{{
    IPAACA_EXPORT IU::ptr IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
    {
    
    	return IU::create(category, payload_type, read_only);
    }
    IPAACA_EXPORT IU::ptr IU::create(const std::string& category, const std::string& payload_type, bool read_only)
    {
    
    	IU::ptr iu = IU::ptr(new IU(category, IU_ACCESS_PUSH, read_only, (payload_type=="")?__ipaaca_static_option_default_payload_type:payload_type)); /* params */ //));
    
    	iu->_payload.initialize(iu);
    	return iu;
    }
    
    IPAACA_EXPORT IU::IU(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
    {
    	_revision = 1;
    	_uid = ipaaca::generate_uuid_string();
    	_category = category;
    
    	_payload_type = (payload_type=="")?__ipaaca_static_option_default_payload_type:payload_type;
    
    	// payload initialization deferred to IU::create(), above
    	_read_only = read_only;
    	_access_mode = access_mode;
    	_committed = false;
    }
    
    IPAACA_EXPORT void IU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
    {
    	_revision_lock.lock();
    	if (_committed) {
    		_revision_lock.unlock();
    		throw IUCommittedError();
    	}
    	_increase_revision_number();
    	if (is_published()) {
    		_buffer->_send_iu_link_update(this, is_delta, _revision, new_links, links_to_remove, writer_name);
    	}
    	_revision_lock.unlock();
    }
    
    
    
    /*
     * IPAACA_EXPORT void IU::_publish_resend(IU::ptr iu, const std::string& hidden_scope_name)
    
    {
    	//_revision_lock.lock();
    	//if (_committed) {
    	//	_revision_lock.unlock();
    	//	throw IUCommittedError();
    	//}
    	//_increase_revision_number();
    	//if (is_published()) {
    	//IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name
    	_buffer->_publish_iu_resend(iu, hidden_scope_name);
    	//}
    	//_revision_lock.unlock();
    }
    
    IPAACA_EXPORT void IU::_modify_payload(bool is_delta, const std::map<std::string, PayloadDocumentEntry::ptr>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
    
    	_revision_lock.lock();
    	if (_committed) {
    		_revision_lock.unlock();
    		throw IUCommittedError();
    	}
    	_increase_revision_number();
    	if (is_published()) {
    
    		IPAACA_DEBUG("Sending a payload update, new entries:")
    		for (auto& kv: new_items) {
    			IPAACA_DEBUG("  " << kv.first << " -> " << kv.second)
    		}
    		IPAACA_DEBUG("and with removed keys:")
    		for (auto& k: keys_to_remove) {
    			IPAACA_DEBUG("  " << k)
    		}
    
    		_buffer->_send_iu_payload_update(this, is_delta, _revision, new_items, keys_to_remove, writer_name);
    
    		IPAACA_DEBUG("... sent.")
    
    	}
    	_revision_lock.unlock();
    }
    
    IPAACA_EXPORT void IU::commit()
    {
    	_internal_commit();
    }
    
    IPAACA_EXPORT void IU::_internal_commit(const std::string& writer_name)
    {
    	_revision_lock.lock();
    	if (_committed) {
    		_revision_lock.unlock();
    		throw IUCommittedError();
    	}
    	_increase_revision_number();
    	_committed = true;
    	if (is_published()) {
    		_buffer->_send_iu_commission(this, _revision, writer_name);
    	}
    	_revision_lock.unlock();
    }
    //}}}
    // Message//{{{
    Message::ptr Message::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
    {
    
    	return Message::create(category, (payload_type=="")?__ipaaca_static_option_default_payload_type:payload_type);
    
    }
    Message::ptr Message::create(const std::string& category, const std::string& payload_type)
    {
    
    	Message::ptr iu = Message::ptr(new Message(category, IU_ACCESS_MESSAGE, true, (payload_type=="")?__ipaaca_static_option_default_payload_type:payload_type)); /* params */ //));
    
    	iu->_payload.initialize(iu);
    	return iu;
    }
    
    Message::Message(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
    : IU(category, access_mode, read_only, payload_type)
    {
    }
    
    void Message::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
    {
    	if (is_published()) {
    		IPAACA_INFO("Info: modifying a Message after sending has no global effects")
    	}
    }
    
    void Message::_modify_payload(bool is_delta, const std::map<std::string, PayloadDocumentEntry::ptr>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
    
    {
    	if (is_published()) {
    		IPAACA_INFO("Info: modifying a Message after sending has no global effects")
    	}
    }
    
    void Message::_internal_commit(const std::string& writer_name)
    {
    	if (is_published()) {
    		IPAACA_INFO("Info: committing to a Message after sending has no global effects")
    	}
    
    }
    //}}}
    
    // RemotePushIU//{{{
    
    IPAACA_EXPORT RemotePushIU::ptr RemotePushIU::create()
    {
    	RemotePushIU::ptr iu = RemotePushIU::ptr(new RemotePushIU(/* params */));
    	iu->_payload.initialize(iu);
    	return iu;
    }
    IPAACA_EXPORT RemotePushIU::RemotePushIU()
    {
    	// nothing
    }
    IPAACA_EXPORT void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
    {
    	if (_committed) {
    		throw IUCommittedError();
    	}
    	if (_read_only) {
    		throw IUReadOnlyError();
    	}
    	RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
    	IULinkUpdate::ptr update = IULinkUpdate::ptr(new IULinkUpdate());
    	update->uid = _uid;
    	update->revision = _revision;
    	update->is_delta = is_delta;
    	update->writer_name = _buffer->unique_name();
    	update->new_links = new_links;
    	update->links_to_remove = links_to_remove;
    	boost::shared_ptr<int> result = server->call<int>("updateLinks", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO
    	if (*result == 0) {
    		throw IUUpdateFailedError();
    	} else {
    		_revision = *result;
    	}
    }
    
    IPAACA_EXPORT void RemotePushIU::_modify_payload(bool is_delta, const std::map<std::string, PayloadDocumentEntry::ptr>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
    
    {
    	//std::cout << "-- Sending a modify_payload with " << new_items.size() << " keys to merge." << std::endl;
    	if (_committed) {
    		throw IUCommittedError();
    	}
    	if (_read_only) {
    		throw IUReadOnlyError();
    	}
    	RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
    	IUPayloadUpdate::ptr update = IUPayloadUpdate::ptr(new IUPayloadUpdate());
    	update->uid = _uid;
    	update->revision = _revision;
    	update->is_delta = is_delta;
    	update->writer_name = _buffer->unique_name();
    	update->new_items = new_items;
    	update->keys_to_remove = keys_to_remove;
    
    	update->payload_type = _payload_type;
    
    	boost::shared_ptr<int> result = server->call<int>("updatePayload", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO
    	if (*result == 0) {
    		throw IUUpdateFailedError();
    	} else {
    		_revision = *result;
    	}
    }
    
    IPAACA_EXPORT void RemotePushIU::commit()
    {
    	if (_read_only) {
    		throw IUReadOnlyError();
    	}
    	if (_committed) {
    		// Following python version: ignoring multiple commit
    		return;
    	}
    	RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
    	boost::shared_ptr<protobuf::IUCommission> update = boost::shared_ptr<protobuf::IUCommission>(new protobuf::IUCommission());
    	update->set_uid(_uid);
    	update->set_revision(_revision);
    	update->set_writer_name(_buffer->unique_name());
    	boost::shared_ptr<int> result = server->call<int>("commit", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO
    	if (*result == 0) {
    		throw IUUpdateFailedError();
    	} else {
    		_revision = *result;
    	}
    }
    
    IPAACA_EXPORT void RemotePushIU::_apply_link_update(IULinkUpdate::ptr update)
    {
    	_revision = update->revision;
    	if (update->is_delta) {
    		_add_and_remove_links(update->new_links, update->links_to_remove);
    	} else {
    		_replace_links(update->new_links);
    	}
    }
    IPAACA_EXPORT void RemotePushIU::_apply_update(IUPayloadUpdate::ptr update)
    {
    	_revision = update->revision;
    	if (update->is_delta) {
    		for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) {
    			_payload._remotely_enforced_delitem(*it);
    		}
    
    		for (std::map<std::string, PayloadDocumentEntry::ptr>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
    
    			_payload._remotely_enforced_setitem(it->first, it->second);
    		}
    	} else {
    		_payload._remotely_enforced_wipe();
    
    		for (std::map<std::string, PayloadDocumentEntry::ptr>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
    
    			_payload._remotely_enforced_setitem(it->first, it->second);
    		}
    	}
    }
    IPAACA_EXPORT void RemotePushIU::_apply_commission()
    {
    	_committed = true;
    }
    IPAACA_EXPORT void RemotePushIU::_apply_retraction()
    {
    	_retracted = true;
    }
    //}}}
    
    // RemoteMessage//{{{
    
    IPAACA_EXPORT RemoteMessage::ptr RemoteMessage::create()
    {
    	RemoteMessage::ptr iu = RemoteMessage::ptr(new RemoteMessage(/* params */));
    	iu->_payload.initialize(iu);
    	return iu;
    }
    IPAACA_EXPORT RemoteMessage::RemoteMessage()
    {
    	// nothing
    }
    IPAACA_EXPORT void RemoteMessage::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
    {
    	IPAACA_INFO("Info: modifying a RemoteMessage only has local effects")
    }
    
    IPAACA_EXPORT void RemoteMessage::_modify_payload(bool is_delta, const std::map<std::string, PayloadDocumentEntry::ptr>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
    
    {
    	IPAACA_INFO("Info: modifying a RemoteMessage only has local effects")
    }
    IPAACA_EXPORT void RemoteMessage::commit()
    {
    	IPAACA_INFO("Info: committing to a RemoteMessage only has local effects")
    }
    
    IPAACA_EXPORT void RemoteMessage::_apply_link_update(IULinkUpdate::ptr update)
    {
    	IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_link_update")
    	_revision = update->revision;
    	if (update->is_delta) {
    		_add_and_remove_links(update->new_links, update->links_to_remove);
    	} else {
    		_replace_links(update->new_links);
    	}
    }
    IPAACA_EXPORT void RemoteMessage::_apply_update(IUPayloadUpdate::ptr update)
    {
    	IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_update")
    	_revision = update->revision;
    	if (update->is_delta) {
    		for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) {
    			_payload._remotely_enforced_delitem(*it);
    		}
    
    		for (std::map<std::string, PayloadDocumentEntry::ptr>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
    
    			_payload._remotely_enforced_setitem(it->first, it->second);
    		}
    	} else {
    		_payload._remotely_enforced_wipe();
    
    		for (std::map<std::string, PayloadDocumentEntry::ptr>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
    
    			_payload._remotely_enforced_setitem(it->first, it->second);
    		}
    	}
    }
    IPAACA_EXPORT void RemoteMessage::_apply_commission()
    {
    	IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_commission")
    	_committed = true;
    }
    IPAACA_EXPORT void RemoteMessage::_apply_retraction()
    {
    	IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_retraction")
    	_retracted = true;
    }
    
    //}}}
    
    } // of namespace ipaaca