Skip to content
Snippets Groups Projects
ipaaca-backend-mqtt.cc 10.03 KiB
/*
 * This file is part of IPAACA, the
 *  "Incremental Processing Architecture
 *   for Artificial Conversational Agents".
 *
 * Copyright (c) 2009-2019 Social Cognitive Systems Group
 *                         (formerly the Sociable Agents Group)
 *                         CITEC, Bielefeld University
 *
 * http://opensource.cit-ec.de/projects/ipaaca/
 * http://purl.org/net/ipaaca
 *
 * This file may be licensed under the terms of of the
 * GNU Lesser General Public License Version 3 (the ``LGPL''),
 * or (at your option) any later version.
 *
 * Software distributed under the License is distributed
 * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
 * express or implied. See the LGPL for the specific language
 * governing rights and limitations.
 *
 * You should have received a copy of the LGPL along with this
 * program. If not, go to http://www.gnu.org/licenses/lgpl.html
 * or write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 *
 * The development of this software was supported by the
 * Excellence Cluster EXC 277 Cognitive Interaction Technology.
 * The Excellence Cluster EXC 277 is a grant of the Deutsche
 * Forschungsgemeinschaft (DFG) in the context of the German
 * Excellence Initiative.
 */

/**
 * \file   ipaaca-backend-mqtt.cc
 *
 * \brief Source file for the MQTT backend
 *
 * \author Ramin Yaghoubzadeh Torky (ryaghoubzadeh@uni-bielefeld.de)
 * \date   January, 2019
 */

#include <ipaaca/ipaaca.h>

namespace ipaaca {
#include <ipaaca/ipaaca-backend-mqtt.h>
namespace backend {
namespace mqtt {

// The following is a required static library initialization hook.
//   This way, the backend gets registered into the global store just by getting selectively
//   compiled in (i.e. without insertion into the general code or plugin loading).
//   The back-end name is taken from the one provided in the BackEnd constructor.
static bool __initialize_mqtt_backend = BackEndLibrary::get()->register_backend(MQTTBackEnd::get());

// MQTTBackEnd{{{
// The backend class is the interface to the rest of the IPAACA library,
//  which does not know any of the implementation details here.
//  It is available via its (unique) given name (here "mqtt", just below)
MQTTBackEnd::MQTTBackEnd()
: BackEnd("mqtt") {
}
BackEnd::ptr MQTTBackEnd::get() {
    static ptr backend_singleton;
    if (!backend_singleton) {
        mosqpp::lib_init();
        backend_singleton = std::shared_ptr<MQTTBackEnd>(new MQTTBackEnd());
    }
    return backend_singleton;
}
Informer::ptr MQTTBackEnd::createInformer(const std::string& scope)
{
    auto res = std::make_shared<MQTTInformer>(generate_client_id(), scope, get_global_config());
    res->wait_live();
    // TODO wait for it to come live?
    return res;
}
Listener::ptr MQTTBackEnd::createListener(const std::string& scope, InputBuffer* buf)
{
    auto res = std::make_shared<MQTTListener>(generate_client_id(), scope, buf, get_global_config());
    res->wait_live();
    // TODO wait for it to come live?
    return res;
}
LocalServer::ptr MQTTBackEnd::createLocalServer(const std::string& scope, OutputBuffer* buf)
{
    auto res = std::make_shared<MQTTLocalServer>(generate_client_id(), scope, buf, get_global_config());
    //res->wait_live();
    // TODO wait for it to come live?
    return res;
}
RemoteServer::ptr MQTTBackEnd::createRemoteServer(const std::string& scope)
{
    auto res = std::make_shared<MQTTRemoteServer>(generate_client_id(), scope, get_global_config());
    //res->wait_live();
    // TODO wait for it to come live?
    return res;
}
//}}}

//
// Internal implementation follows
//

// ParticipantCore{{{
ParticipantCore::ParticipantCore()
: _running(false), _live(false)
{
}
void ParticipantCore::signal_live() {
    IPAACA_DEBUG("Notifying to wake up an async MQTT session (now live)")
    _live = true;
    _condvar.notify_one();
}
bool ParticipantCore::wait_live(long timeout_milliseconds) {
    IPAACA_DEBUG("Waiting for an MQTT session to come live")
    std::unique_lock<std::mutex> lock(_condvar_mutex);
    // mqtt handlers will notify this after connect or subscribe (depending on the subclass)
    auto success = _condvar.wait_for(lock, std::chrono::milliseconds(timeout_milliseconds), [this]{return this->_live;});
    if (!success) {
        IPAACA_ERROR("Backend timeout: failed to go live")
        return false; // TODO throw here or in construction wrapper (below)
    }
    return true;
}
//}}}

// MQTTParticipant{{{
int MQTTParticipant::get_next_mid() {
    static int _curmid = 0;
    _curmid++;
    return _curmid;
}

MQTTParticipant::MQTTParticipant(const std::string& client_id, const std::string& scope, Config::ptr config)
: ParticipantCore(), mosqpp::mosquittopp(client_id.c_str(), true), _scope(scope)
{
    //threaded_set(true);
    _client_id = client_id;
    // get connection parameters from config
    if (config) {
        host = config->get_with_default_and_warning<std::string>("transport.mqtt.host", "localhost");
        port = config->get_with_default_and_warning<int>("transport.mqtt.port", 1883);
        keepalive = config->get_with_default<int>("transport.mqtt.keepalive", 60);
    } else {
        host = "localhost";
        port = 1883;
        keepalive = 60;
        IPAACA_ERROR("No Config provided in MQTT backend, using defaults: host=localhost port=1883 keepalive=60")
    }
    IPAACA_DEBUG("Created MQTTParticipant on " << host << ":" << port << " for scope " << _scope << " with prepared client id " << _client_id)
}

IPAACA_EXPORT void MQTTParticipant::connect_and_background()
{
    int res = connect(host.c_str(), port, keepalive);
    loop_start();
    if (res!=0) {
        IPAACA_ERROR("MQTT connect (on topic " << _scope << ") returned an error " << res << " - please double-check parameters")
        if (res==14) {
            IPAACA_ERROR("The underlying system error: errno " << errno << " (" << strerror(errno) << ")")
        }
        throw BackEndConnectionFailedError();
    } else {
        IPAACA_DEBUG("connect OK for scope " << _scope)
    }
}
void MQTTParticipant::on_error()
{
    IPAACA_ERROR("MQTT error")
}

void MQTTParticipant::on_disconnect(int rc)
{
    IPAACA_ERROR("MQTT disconnect of " << _scope << " with rc " << rc)
}
//}}}

// MQTTInformer {{{
MQTTInformer::MQTTInformer(const std::string& client_id, const std::string& scope, Config::ptr config)
: MQTTParticipant(client_id, scope, config)
{
    IPAACA_DEBUG("Create MQTTInformer for scope " << ((std::string) scope))
    connect_and_background();
}
void MQTTInformer::on_connect(int rc)
{
    signal_live();
}
IPAACA_EXPORT bool MQTTInformer::internal_publish(const std::string& wire)
{
    IPAACA_DEBUG("Trying to publish via MQTT, topic " << _scope)
    //int mid = MQTTParticipant::get_next_mid();
    int result = mosquittopp::publish(NULL, _scope.c_str(), wire.size(), wire.c_str(), 2, false);
}
//}}}

// MQTTListener {{{
MQTTListener::MQTTListener(const std::string& client_id, const std::string& scope, InputBuffer* buffer_ptr, Config::ptr config)
: MQTTParticipant(client_id, scope, config), Listener(buffer_ptr)
{
    IPAACA_DEBUG("Create MQTTListener for scope " << ((std::string) scope))
    connect_and_background();
}
void MQTTListener::on_connect(int rc)
{
    IPAACA_WARNING("Listener::on_connect for the one on scope " << _scope)
    //subscribe(&mid, _scope.c_str(), 2);
    //int mid = MQTTParticipant::get_next_mid();
    int res = subscribe(NULL, _scope.c_str(), 2);
    if (res!=0) {
        IPAACA_ERROR("subscribe (on topic " << _scope << ") returned an error " << res)
    } else {
        IPAACA_DEBUG("subscribe returned OK for topic " << _scope)
    }
}

void MQTTListener::on_subscribe(int mid, int qos_count, const int * granted_qos)
{
    IPAACA_DEBUG("on_subscribe: " << mid << " " << qos_count << " for scope " << _scope)
    if (qos_count < 1) {
        IPAACA_WARNING("No QoS grants reported")
    } else if (qos_count > 1) {
        IPAACA_WARNING("More than one QoS grant reported for Listener, should not happen")
    } else {
        int qos = granted_qos[0];
        if (qos!=2) {
            IPAACA_WARNING("MQTT QoS level 2 (guaranteed delivery) has NOT been granted on " << _scope << " (we got level " << qos << ")")
        }
    }
    signal_live();
}

IPAACA_EXPORT void MQTTListener::on_message(const struct mosquitto_message * message) {
    // internal_deserialize expects a string, which we construct here from the received char* and len
    auto event = ipaaca::converters::internal_deserialize(std::string((const char*) message->payload, message->payloadlen));
    //std::cout << "GOT AN EVENT of type " << event->getType() << std::endl;
    // let the Listener base class handle the propagation into a Buffer:
    Listener::relay_received_event_to_buffer(event);
}
//}}}

// MQTTLocalServer {{{
MQTTLocalServer::MQTTLocalServer(const std::string& client_id, const std::string& scope, ipaaca::OutputBuffer* buffer_ptr, Config::ptr config)
: MQTTParticipant(client_id, scope, config), _buffer(buffer_ptr)
{
    std::cout << "CREATE MQTTLocalServer for scope " << ((std::string) scope) << std::endl;
}
//}}}

// MQTTRemoteServer{{{
// RemoteServer (= the side that sends requests to the owner of a remote IU)
MQTTRemoteServer::MQTTRemoteServer(const std::string& client_id, const std::string& scope, Config::ptr config)
: MQTTParticipant(client_id, scope, config) {
    std::cout << "CREATE MQTTRemoteServer for scope " << ((std::string) scope) << std::endl;
}
IPAACA_EXPORT int64_t MQTTRemoteServer::request_remote_payload_update(std::shared_ptr<IUPayloadUpdate> update)
{
    IPAACA_WARNING("Implement me")
    return 0;
}
IPAACA_EXPORT int64_t MQTTRemoteServer::request_remote_link_update(std::shared_ptr<IULinkUpdate> update)
{
    IPAACA_WARNING("Implement me")
    return 0;
}
IPAACA_EXPORT int64_t MQTTRemoteServer::request_remote_commission(std::shared_ptr<protobuf::IUCommission> update)
{
    IPAACA_WARNING("Implement me")
    return 0;
}
IPAACA_EXPORT int64_t MQTTRemoteServer::request_remote_resend_request(std::shared_ptr<protobuf::IUResendRequest> update)
{
    IPAACA_WARNING("Implement me")
    return 0;
}
//}}}


} // of namespace mqtt
} // of namespace backend
} // of namespace ipaaca