/* * This file is part of IPAACA, the * "Incremental Processing Architecture * for Artificial Conversational Agents". * * Copyright (c) 2009-2019 Social Cognitive Systems Group * (formerly the Sociable Agents Group) * CITEC, Bielefeld University * * http://opensource.cit-ec.de/projects/ipaaca/ * http://purl.org/net/ipaaca * * This file may be licensed under the terms of of the * GNU Lesser General Public License Version 3 (the ``LGPL''), * or (at your option) any later version. * * Software distributed under the License is distributed * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either * express or implied. See the LGPL for the specific language * governing rights and limitations. * * You should have received a copy of the LGPL along with this * program. If not, go to http://www.gnu.org/licenses/lgpl.html * or write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * * The development of this software was supported by the * Excellence Cluster EXC 277 Cognitive Interaction Technology. * The Excellence Cluster EXC 277 is a grant of the Deutsche * Forschungsgemeinschaft (DFG) in the context of the German * Excellence Initiative. */ /** * \file ipaaca-backend-mqtt.cc * * \brief Source file for the MQTT backend * * \author Ramin Yaghoubzadeh Torky (ryaghoubzadeh@uni-bielefeld.de) * \date January, 2019 */ #include <ipaaca/ipaaca.h> namespace ipaaca { #include <ipaaca/ipaaca-backend-mqtt.h> namespace backend { namespace mqtt { // The following is a required static library initialization hook. // This way, the backend gets registered into the global store just by getting selectively // compiled in (i.e. without insertion into the general code or plugin loading). // The back-end name is taken from the one provided in the BackEnd constructor. static bool __initialize_mqtt_backend = BackEndLibrary::get()->register_backend(MQTTBackEnd::get()); // MQTTBackEnd{{{ // The backend class is the interface to the rest of the IPAACA library, // which does not know any of the implementation details here. // It is available via its (unique) given name (here "mqtt", just below) MQTTBackEnd::MQTTBackEnd() : BackEnd("mqtt") { } BackEnd::ptr MQTTBackEnd::get() { static ptr backend_singleton; if (!backend_singleton) { mosqpp::lib_init(); backend_singleton = std::shared_ptr<MQTTBackEnd>(new MQTTBackEnd()); } return backend_singleton; } Informer::ptr MQTTBackEnd::createInformer(const std::string& scope) { auto res = std::make_shared<MQTTInformer>(generate_client_id(), scope, get_global_config()); res->wait_live(); // TODO wait for it to come live? return res; } Listener::ptr MQTTBackEnd::createListener(const std::string& scope, InputBuffer* buf) { auto res = std::make_shared<MQTTListener>(generate_client_id(), scope, buf, get_global_config()); res->wait_live(); // TODO wait for it to come live? return res; } LocalServer::ptr MQTTBackEnd::createLocalServer(const std::string& scope, OutputBuffer* buf) { auto res = std::make_shared<MQTTLocalServer>(generate_client_id(), scope, buf, get_global_config()); //res->wait_live(); // TODO wait for it to come live? return res; } RemoteServer::ptr MQTTBackEnd::createRemoteServer(const std::string& scope) { auto res = std::make_shared<MQTTRemoteServer>(generate_client_id(), scope, get_global_config()); //res->wait_live(); // TODO wait for it to come live? return res; } //}}} // // Internal implementation follows // // ParticipantCore{{{ ParticipantCore::ParticipantCore() : _running(false), _live(false) { } void ParticipantCore::signal_live() { IPAACA_DEBUG("Notifying to wake up an async MQTT session (now live)") _live = true; _condvar.notify_one(); } bool ParticipantCore::wait_live(long timeout_milliseconds) { IPAACA_DEBUG("Waiting for an MQTT session to come live") std::unique_lock<std::mutex> lock(_condvar_mutex); // mqtt handlers will notify this after connect or subscribe (depending on the subclass) auto success = _condvar.wait_for(lock, std::chrono::milliseconds(timeout_milliseconds), [this]{return this->_live;}); if (!success) { IPAACA_ERROR("Backend timeout: failed to go live") return false; // TODO throw here or in construction wrapper (below) } return true; } //}}} // MQTTParticipant{{{ int MQTTParticipant::get_next_mid() { static int _curmid = 0; _curmid++; return _curmid; } MQTTParticipant::MQTTParticipant(const std::string& client_id, const std::string& scope, Config::ptr config) : ParticipantCore(), mosqpp::mosquittopp(client_id.c_str(), true), _scope(scope) { //threaded_set(true); _client_id = client_id; // get connection parameters from config if (config) { host = config->get_with_default_and_warning<std::string>("transport.mqtt.host", "localhost"); port = config->get_with_default_and_warning<int>("transport.mqtt.port", 1883); keepalive = config->get_with_default<int>("transport.mqtt.keepalive", 60); } else { host = "localhost"; port = 1883; keepalive = 60; IPAACA_ERROR("No Config provided in MQTT backend, using defaults: host=localhost port=1883 keepalive=60") } IPAACA_DEBUG("Created MQTTParticipant on " << host << ":" << port << " for scope " << _scope << " with prepared client id " << _client_id) } IPAACA_EXPORT void MQTTParticipant::connect_and_background() { int res = connect(host.c_str(), port, keepalive); loop_start(); if (res!=0) { IPAACA_ERROR("MQTT connect (on topic " << _scope << ") returned an error " << res << " - please double-check parameters") if (res==14) { IPAACA_ERROR("The underlying system error: errno " << errno << " (" << strerror(errno) << ")") } throw BackEndConnectionFailedError(); } else { IPAACA_DEBUG("connect OK for scope " << _scope) } } void MQTTParticipant::on_error() { IPAACA_ERROR("MQTT error") } void MQTTParticipant::on_disconnect(int rc) { IPAACA_ERROR("MQTT disconnect of " << _scope << " with rc " << rc) } //}}} // MQTTInformer {{{ MQTTInformer::MQTTInformer(const std::string& client_id, const std::string& scope, Config::ptr config) : MQTTParticipant(client_id, scope, config) { IPAACA_DEBUG("Create MQTTInformer for scope " << ((std::string) scope)) connect_and_background(); } void MQTTInformer::on_connect(int rc) { signal_live(); } IPAACA_EXPORT bool MQTTInformer::internal_publish(const std::string& wire) { IPAACA_DEBUG("Trying to publish via MQTT, topic " << _scope) //int mid = MQTTParticipant::get_next_mid(); int result = mosquittopp::publish(NULL, _scope.c_str(), wire.size(), wire.c_str(), 2, false); } //}}} // MQTTListener {{{ MQTTListener::MQTTListener(const std::string& client_id, const std::string& scope, InputBuffer* buffer_ptr, Config::ptr config) : MQTTParticipant(client_id, scope, config), Listener(buffer_ptr) { IPAACA_DEBUG("Create MQTTListener for scope " << ((std::string) scope)) connect_and_background(); } void MQTTListener::on_connect(int rc) { IPAACA_WARNING("Listener::on_connect for the one on scope " << _scope) //subscribe(&mid, _scope.c_str(), 2); //int mid = MQTTParticipant::get_next_mid(); int res = subscribe(NULL, _scope.c_str(), 2); if (res!=0) { IPAACA_ERROR("subscribe (on topic " << _scope << ") returned an error " << res) } else { IPAACA_DEBUG("subscribe returned OK for topic " << _scope) } } void MQTTListener::on_subscribe(int mid, int qos_count, const int * granted_qos) { IPAACA_DEBUG("on_subscribe: " << mid << " " << qos_count << " for scope " << _scope) if (qos_count < 1) { IPAACA_WARNING("No QoS grants reported") } else if (qos_count > 1) { IPAACA_WARNING("More than one QoS grant reported for Listener, should not happen") } else { int qos = granted_qos[0]; if (qos!=2) { IPAACA_WARNING("MQTT QoS level 2 (guaranteed delivery) has NOT been granted on " << _scope << " (we got level " << qos << ")") } } signal_live(); } IPAACA_EXPORT void MQTTListener::on_message(const struct mosquitto_message * message) { // internal_deserialize expects a string, which we construct here from the received char* and len auto event = ipaaca::converters::internal_deserialize(std::string((const char*) message->payload, message->payloadlen)); //std::cout << "GOT AN EVENT of type " << event->getType() << std::endl; // let the Listener base class handle the propagation into a Buffer: Listener::relay_received_event_to_buffer(event); } //}}} // MQTTLocalServer {{{ MQTTLocalServer::MQTTLocalServer(const std::string& client_id, const std::string& scope, ipaaca::OutputBuffer* buffer_ptr, Config::ptr config) : MQTTParticipant(client_id, scope, config), _buffer(buffer_ptr) { std::cout << "CREATE MQTTLocalServer for scope " << ((std::string) scope) << std::endl; } //}}} // MQTTRemoteServer{{{ // RemoteServer (= the side that sends requests to the owner of a remote IU) MQTTRemoteServer::MQTTRemoteServer(const std::string& client_id, const std::string& scope, Config::ptr config) : MQTTParticipant(client_id, scope, config) { std::cout << "CREATE MQTTRemoteServer for scope " << ((std::string) scope) << std::endl; } IPAACA_EXPORT int64_t MQTTRemoteServer::request_remote_payload_update(std::shared_ptr<IUPayloadUpdate> update) { IPAACA_WARNING("Implement me") return 0; } IPAACA_EXPORT int64_t MQTTRemoteServer::request_remote_link_update(std::shared_ptr<IULinkUpdate> update) { IPAACA_WARNING("Implement me") return 0; } IPAACA_EXPORT int64_t MQTTRemoteServer::request_remote_commission(std::shared_ptr<protobuf::IUCommission> update) { IPAACA_WARNING("Implement me") return 0; } IPAACA_EXPORT int64_t MQTTRemoteServer::request_remote_resend_request(std::shared_ptr<protobuf::IUResendRequest> update) { IPAACA_WARNING("Implement me") return 0; } //}}} } // of namespace mqtt } // of namespace backend } // of namespace ipaaca