/*
 * This file is part of IPAACA, the
 *  "Incremental Processing Architecture
 *   for Artificial Conversational Agents".
 *
 * Copyright (c) 2009-2019 Social Cognitive Systems Group
 *                         (formerly the Sociable Agents Group)
 *                         CITEC, Bielefeld University
 *
 * http://opensource.cit-ec.de/projects/ipaaca/
 * http://purl.org/net/ipaaca
 *
 * This file may be licensed under the terms of of the
 * GNU Lesser General Public License Version 3 (the ``LGPL''),
 * or (at your option) any later version.
 *
 * Software distributed under the License is distributed
 * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
 * express or implied. See the LGPL for the specific language
 * governing rights and limitations.
 *
 * You should have received a copy of the LGPL along with this
 * program. If not, go to http://www.gnu.org/licenses/lgpl.html
 * or write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 *
 * The development of this software was supported by the
 * Excellence Cluster EXC 277 Cognitive Interaction Technology.
 * The Excellence Cluster EXC 277 is a grant of the Deutsche
 * Forschungsgemeinschaft (DFG) in the context of the German
 * Excellence Initiative.
 */

/**
 * \file   ipaaca-backend-ros.cc
 *
 * \brief Source file for the ROS backend
 *
 * \author Ramin Yaghoubzadeh Torky (ryaghoubzadeh@uni-bielefeld.de)
 * \date   January, 2019
 */

#include <ipaaca/ipaaca.h>

#include "b64/b64.h"

namespace ipaaca {
#include <ipaaca/ipaaca-backend-ros.h>
namespace backend {
namespace ros {

// The following is a required static library initialization hook.
//   This way, the backend gets registered into the global store just by getting selectively
//   compiled in (i.e. without insertion into the general code or plugin loading).
//   The back-end name is taken from the one provided in the BackEnd constructor.
IPAACA_EXPORT static bool __initialize_ros_backend = BackEndLibrary::get()->register_backend(ROSBackEnd::get());

// ROSBackEnd{{{
// The backend class is the interface to the rest of the IPAACA library,
//  which does not know any of the implementation details here.
//  It is available via its (unique) given name (here "ros", just below)
IPAACA_EXPORT ROSBackEnd::ROSBackEnd()
: BackEnd("ros"), _need_init(true), _node_handle(NULL)
{
}
void sigint_ros_shutdown(int signal)
{
    std::cout << "SIGINT" << std::endl;
    ::ros::shutdown();
    exit(0);
}
IPAACA_EXPORT void ROSBackEnd::init_once()
{
    if (_need_init) {
        IPAACA_INFO("Initializing ROS back-end ...")
        const char* fakename = "ipaaca_cpp_bin";
        _cfakename = (char*) malloc(16);
        strncpy(_cfakename, fakename, 15);
        char* fake_argv[] = { _cfakename, NULL };
        int fake_argc = 1;
        int num_spinner_threads = 4;
        _need_init = false;
        IPAACA_INFO("Initializing ROS node ...")
        ::ros::init(fake_argc, fake_argv, fakename, ::ros::init_options::AnonymousName); // | ::ros::init_options::NoRosout
        //
        IPAACA_INFO("Starting ROS node ...")
        _node_handle = new ::ros::NodeHandle(); // internally calls ::ros::start()
        //
        signal(SIGINT, sigint_ros_shutdown);
        //
        IPAACA_INFO("Starting ROS spinner thread ...")
        _spinner = new ::ros::AsyncSpinner(num_spinner_threads);
        _spinner->start();
    }
}
IPAACA_EXPORT void ROSBackEnd::teardown()
{
    IPAACA_INFO("Stopping ROS spinner thread ...")
    _spinner->stop();
    delete _spinner;
    _spinner = NULL;
    IPAACA_INFO("Shutting down ROS node ...")
    ::ros::shutdown();
    delete _node_handle;
    _node_handle = NULL;
    free(_cfakename);
}
IPAACA_EXPORT BackEnd::ptr ROSBackEnd::get() {
    static ptr backend_singleton;
    if (!backend_singleton) {
        backend_singleton = std::shared_ptr<ROSBackEnd>(new ROSBackEnd());
    }
    return backend_singleton;
}
IPAACA_EXPORT Informer::ptr ROSBackEnd::createInformer(const std::string& scope)
{
    init_once();
    auto res = std::make_shared<ROSInformer>(_node_handle, scope, get_global_config());
    return res;
}
IPAACA_EXPORT Listener::ptr ROSBackEnd::createListener(const std::string& scope, InputBuffer* buf)
{
    init_once();
    auto res = std::make_shared<ROSListener>(_node_handle, scope, buf, get_global_config());
    return res;
}
IPAACA_EXPORT LocalServer::ptr ROSBackEnd::createLocalServer(const std::string& scope, OutputBuffer* buf)
{
    init_once();
    auto res = std::make_shared<ROSLocalServer>(_node_handle, scope, buf, get_global_config());
    return res;
}
IPAACA_EXPORT RemoteServer::ptr ROSBackEnd::createRemoteServer(const std::string& scope)
{
    init_once();
    auto res = std::make_shared<ROSRemoteServer>(_node_handle, scope, get_global_config());
    return res;
}
//}}}

//
// Internal implementation follows
//

// ParticipantCore{{{
IPAACA_EXPORT ParticipantCore::ParticipantCore()
: _running(false), _live(false)
{
}
IPAACA_EXPORT void ParticipantCore::signal_live() {
    IPAACA_DEBUG("Notifying to wake up an async ROS session (now live)")
    _live = true;
    _condvar.notify_one();
}
IPAACA_EXPORT bool ParticipantCore::wait_live(long timeout_milliseconds) {
    IPAACA_DEBUG("Waiting for an ROS session to come live")
    std::unique_lock<std::mutex> lock(_condvar_mutex);
    // ros handlers will notify this after connect or subscribe (depending on the subclass)
    auto success = _condvar.wait_for(lock, std::chrono::milliseconds(timeout_milliseconds), [this]{return this->_live;});
    if (!success) {
        IPAACA_ERROR("Backend timeout: failed to go live")
        return false; // TODO throw here or in construction wrapper (below)
    }
    return true;
}
//}}}

// ROSParticipant{{{
IPAACA_EXPORT ROSParticipant::ROSParticipant(::ros::NodeHandle* node, const std::string& scope, Config::ptr config)
: ParticipantCore(), _node_handle(node), _scope(scope)
{
    //_client_id = client_id;
    // get connection parameters from config
    if (config) {
        host = config->get_with_default_and_warning<std::string>("transport.ros.host", "localhost");
        port = config->get_with_default_and_warning<int>("transport.ros.port", 1883);
        keepalive = config->get_with_default<int>("transport.ros.keepalive", 60);
    } else {
        host = "localhost";
        port = 1883;
        keepalive = 60;
        IPAACA_ERROR("No Config provided in ROS backend, using defaults: host=localhost port=1883 keepalive=60")
    }
    //IPAACA_DEBUG("Created ROSParticipant on " << host << ":" << port << " for scope " << _scope) // << " with prepared client id " << _client_id)
    //IPAACA_DEBUG("Created ROSParticipant for scope " << _scope) // << " with prepared client id " << _client_id)
}

//}}}

// ROSInformer {{{
IPAACA_EXPORT ROSInformer::ROSInformer(::ros::NodeHandle* node, const std::string& scope, Config::ptr config)
: ROSParticipant(node, scope, config)
{
    IPAACA_DEBUG("Create ROS Publisher for scope " << ((std::string) scope))
    _ros_pub = node->advertise<std_msgs::String>(scope, 100, true); // latch == true
    //connect_and_background();
}
IPAACA_EXPORT bool ROSInformer::internal_publish(const std::string& wire)
{
    IPAACA_DEBUG("Trying to publish via ROS, topic " << _scope)
    //int mid = ROSParticipant::get_next_mid();
    std_msgs::String msg;
    msg.data = base64_encode(wire);
    // // This would be a way to ensure that an event has at least
    // // one actual recipient (e.g. if you know there must be a receiver
    // // by convention and wait to avoid lost events due to connection delay)
    //while (_ros_pub.getNumSubscribers() == 0) {
    //    std::this_thread().sleep_for(std::chrono::milliseconds(10));
    //}
    _ros_pub.publish(msg);
    IPAACA_DEBUG("... returned from publish.")
    return true;
}
//}}}

// ROSListener {{{
IPAACA_EXPORT ROSListener::ROSListener(::ros::NodeHandle* node, const std::string& scope, InputBuffer* buffer_ptr, Config::ptr config)
: ROSParticipant(node, scope, config), Listener(buffer_ptr)
{
    IPAACA_DEBUG("Create ROSListener for scope " << ((std::string) scope))
    _ros_sub = node->subscribe(scope, 1000, &ROSListener::on_message, this, ::ros::TransportHints().tcpNoDelay().reliable().unreliable());
}
IPAACA_EXPORT void ROSListener::on_message(const std_msgs::String::ConstPtr& msg) {
    // internal_deserialize expects a string, which we construct here from the received char* and len
    auto message = base64_decode(msg->data);
    auto event = ipaaca::converters::internal_deserialize(message);
    std::cout << "GOT AN EVENT of type " << event->getType() << std::endl;
    // let the Listener base class handle the propagation into a Buffer:
    Listener::relay_received_event_to_buffer_threaded(event);
}
//}}}

// ROSLocalServer {{{
IPAACA_EXPORT ROSLocalServer::ROSLocalServer(::ros::NodeHandle* node, const std::string& scope, ipaaca::OutputBuffer* buffer_ptr, Config::ptr config)
: ROSParticipant(node, scope, config), LocalServer(buffer_ptr)
{
    IPAACA_DEBUG("Create ROSLocalServer for scope " << ((std::string) scope));
    _ros_sub = node->subscribe(scope, 1000, &ROSLocalServer::on_message, this, ::ros::TransportHints().tcpNoDelay().reliable().unreliable());
}
//    int res = subscribe(NULL, _scope.c_str(), 2);
IPAACA_EXPORT ::ros::Publisher ROSLocalServer::get_publisher(const std::string& endpoint)
{
    if (_ros_pubs.count(endpoint)) return _ros_pubs[endpoint];
    _ros_pubs[endpoint] = _node_handle->advertise<std_msgs::String>(endpoint, 100, true); // latch == true
    return _ros_pubs[endpoint];
}
IPAACA_EXPORT void ROSLocalServer::send_result_for_request(const std::string& request_endpoint, const std::string& request_uid, int64_t result)
{
    std::string wire;
    std::shared_ptr<protobuf::RemoteRequestResult> pbo(new protobuf::RemoteRequestResult());
    pbo->set_request_uid(request_uid);
    pbo->set_result(result);
    wire = ipaaca::converters::internal_serialize(pbo);
    IPAACA_DEBUG("Trying to send result to RemoteServer " << request_endpoint)
    std_msgs::String msg;
    msg.data = base64_encode(wire);
    auto pub = get_publisher(request_endpoint);
    // // if latching should be insufficient for reliable RPC, activate this:
    //while (pub.getNumSubscribers() == 0) {
    //    std::this_thread().sleep_for(std::chrono::milliseconds(10));
    //}
    pub.publish(msg);
}

IPAACA_EXPORT void ROSLocalServer::on_message(const std_msgs::String::ConstPtr& msg)
{
    auto message = base64_decode(msg->data);
    auto event = ipaaca::converters::internal_deserialize(message);
    auto type = event->getType();
    IPAACA_DEBUG("LocalServer " << _scope << " got an object of type " << type)
    int64_t result = 0;
    std::string request_endpoint("");
    std::string request_uid("");
    if (type == "ipaaca::IUPayloadUpdate") {
        auto obj = std::static_pointer_cast<ipaaca::IUPayloadUpdate>(event->getData());
        request_uid = obj->request_uid;
        request_endpoint = obj->request_endpoint;
        result = LocalServer::attempt_to_apply_remote_payload_update(obj);
    } else if (type == "ipaaca::IULinkUpdate") {
        auto obj = std::static_pointer_cast<ipaaca::IULinkUpdate>(event->getData());
        request_uid = obj->request_uid;
        result = LocalServer::attempt_to_apply_remote_link_update(obj);
    } else if (type == "ipaaca::protobuf::IUCommission") {
        auto obj = std::static_pointer_cast<ipaaca::protobuf::IUCommission>(event->getData());
        request_uid = obj->request_uid();
        result = LocalServer::attempt_to_apply_remote_commission(obj);
    } else if (type == "ipaaca::protobuf::IUResendRequest") {
        auto obj = std::static_pointer_cast<ipaaca::protobuf::IUResendRequest>(event->getData());
        request_uid = obj->request_uid();
        result = LocalServer::attempt_to_apply_remote_resend_request(obj);
    } else {
        IPAACA_ERROR("ROSLocalServer: unhandled request wire type " << type)
    }
    if (request_uid.length()) {
        send_result_for_request(request_endpoint, request_uid, result);
    } else {
        IPAACA_ERROR("ROSLocalServer: cannot reply since request_uid is unknown")
    }
}
//}}}

// ROSRemoteServer{{{
// RemoteServer (= the side that sends requests to the owner of a remote IU)
IPAACA_EXPORT ROSRemoteServer::ROSRemoteServer(::ros::NodeHandle* node, const std::string& scope, Config::ptr config)
: ROSParticipant(node, scope, config) {
    _remote_end_scope = _scope;
    auto uuid = ipaaca::generate_uuid_string().substr(0,8);
    _name = "LocalServer_" +  uuid; // TODO add e.g. pid as in Python
    _scope = "/ipaaca/remotes/" + _name; // overwrites constructed ROSParticipant::_scope here (!)
    IPAACA_DEBUG("Create ROSRemoteServer for remote scope " << ((std::string) _remote_end_scope) << " and reply listener on " << _scope)
    _ros_sub = node->subscribe(_scope, 1000, &ROSRemoteServer::on_message, this, ::ros::TransportHints().tcpNoDelay().reliable().unreliable());
    _ros_pub = node->advertise<std_msgs::String>(_remote_end_scope, 100, true); // latch == true
}
IPAACA_EXPORT int64_t ROSRemoteServer::request_remote_payload_update(std::shared_ptr<IUPayloadUpdate> update)
{
    return blocking_call(std::make_shared<Event>("ipaaca::IUPayloadUpdate", update));
}
IPAACA_EXPORT int64_t ROSRemoteServer::request_remote_link_update(std::shared_ptr<IULinkUpdate> update)
{
    return blocking_call(std::make_shared<Event>("ipaaca::IULinkUpdate", update));
}
IPAACA_EXPORT int64_t ROSRemoteServer::request_remote_commission(std::shared_ptr<protobuf::IUCommission> update)
{
    return blocking_call(std::make_shared<Event>("ipaaca::protobuf::IUCommission", update));
}
IPAACA_EXPORT int64_t ROSRemoteServer::request_remote_resend_request(std::shared_ptr<protobuf::IUResendRequest> update)
{
    return blocking_call(std::make_shared<Event>("ipaaca::protobuf::IUResendRequest", update));
}

IPAACA_EXPORT void ROSRemoteServer::on_message(const std_msgs::String::ConstPtr& msg)
{
    auto message = base64_decode(msg->data);
    auto event = ipaaca::converters::internal_deserialize(message);
    auto type = event->getType();
    IPAACA_DEBUG("RemoteServer " << _scope << " for remote " << _remote_end_scope << " got an object of type " << type)
    if (type == "ipaaca::protobuf::RemoteRequestResult") {
        auto reply = std::static_pointer_cast<ipaaca::protobuf::RemoteRequestResult>(event->getData());
        auto uid = reply->request_uid();
        auto result = reply->result();
        PendingRequest::ptr pending_request;
        {
            ipaaca::Locker locker(_pending_requests_lock);
            auto it = _pending_requests.find(uid);
            if (it != _pending_requests.end()) {
                pending_request = it->second;
                _pending_requests.erase(it);
            }
        }
        if (pending_request) {
            pending_request->reply_with_result(result);
        } else {
            IPAACA_ERROR("ROSRemoteServer: got a reply for a request that is not pending: " << uid)
        }
    } else {
        IPAACA_ERROR("ROSRemoteServer: unhandled request wire type " << type)
    }
}

IPAACA_EXPORT PendingRequest::ptr ROSRemoteServer::queue_pending_request(Event::ptr request)
{
    PendingRequest::ptr pending_request = std::make_shared<PendingRequest>(request);
    {
        ipaaca::Locker locker(_pending_requests_lock);
        if ((_ROS_REMOTE_SERVER_MAX_QUEUED_REQUESTS > 0) && (_pending_requests.size() >= _ROS_REMOTE_SERVER_MAX_QUEUED_REQUESTS)) {
            IPAACA_ERROR("ROSRemoteServer: maximum number of pending requests exceeded")
            throw BackEndBadConditionError();
        } else {
            _pending_requests[pending_request->_request_uid] = pending_request;
            return pending_request;
        }
    }
}
IPAACA_EXPORT int64_t ROSRemoteServer::blocking_call(Event::ptr request)
{
    std::string wire;
    auto pending_request = queue_pending_request(request);
    if (request->getType() == "ipaaca::IUPayloadUpdate") {
        auto obj = std::static_pointer_cast<ipaaca::IUPayloadUpdate>(request->getData());
        obj->request_uid = pending_request->_request_uid;
        obj->request_endpoint = _scope;
        wire = ipaaca::converters::internal_serialize(obj);
    } else if (request->getType() == "ipaaca::IULinkUpdate") {
        auto obj = std::static_pointer_cast<ipaaca::IULinkUpdate>(request->getData());
        obj->request_uid = pending_request->_request_uid;
        obj->request_endpoint = _scope;
        wire = ipaaca::converters::internal_serialize(obj);
    } else if (request->getType() == "ipaaca::protobuf::IUCommission") {
        auto obj = std::static_pointer_cast<ipaaca::protobuf::IUCommission>(request->getData());
        obj->set_request_uid(pending_request->_request_uid);
        obj->set_request_endpoint(_scope);
        wire = ipaaca::converters::internal_serialize(obj);
    } else if (request->getType() == "ipaaca::protobuf::IUResendRequest") {
        auto obj = std::static_pointer_cast<ipaaca::protobuf::IUResendRequest>(request->getData());
        obj->set_request_uid(pending_request->_request_uid);
        obj->set_request_endpoint(_scope);
        wire = ipaaca::converters::internal_serialize(obj);
    } else {
        IPAACA_ERROR("Unhandled request type " << request->getType())
        throw BackEndBadConditionError();
    }
    IPAACA_DEBUG("Trying to send request to remote LocalServer at " << _remote_end_scope)
    std_msgs::String msg;
    msg.data = base64_encode(wire);
    _ros_pub.publish(msg);
    IPAACA_DEBUG("Waiting for the remote server")
    auto result = pending_request->wait_for_reply();
    IPAACA_DEBUG("RPC wait ended.")
    if (result<0) {
        IPAACA_WARNING("A RemoteServer request timed out, remote end was " << _remote_end_scope)
        return 0;
    } else {
        return result;
    }
}

//}}}


} // of namespace ros
} // of namespace backend
} // of namespace ipaaca