/* * This file is part of IPAACA, the * "Incremental Processing Architecture * for Artificial Conversational Agents". * * Copyright (c) 2009-2019 Social Cognitive Systems Group * (formerly the Sociable Agents Group) * CITEC, Bielefeld University * * http://opensource.cit-ec.de/projects/ipaaca/ * http://purl.org/net/ipaaca * * This file may be licensed under the terms of of the * GNU Lesser General Public License Version 3 (the ``LGPL''), * or (at your option) any later version. * * Software distributed under the License is distributed * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either * express or implied. See the LGPL for the specific language * governing rights and limitations. * * You should have received a copy of the LGPL along with this * program. If not, go to http://www.gnu.org/licenses/lgpl.html * or write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * * The development of this software was supported by the * Excellence Cluster EXC 277 Cognitive Interaction Technology. * The Excellence Cluster EXC 277 is a grant of the Deutsche * Forschungsgemeinschaft (DFG) in the context of the German * Excellence Initiative. */ /** * \file ipaaca-backend-mqtt.cc * * \brief Source file for the MQTT backend * * \author Ramin Yaghoubzadeh Torky (ryaghoubzadeh@uni-bielefeld.de) * \date January, 2019 */ #include <ipaaca/ipaaca.h> namespace ipaaca { #include <ipaaca/ipaaca-backend-mqtt.h> namespace backend { namespace mqtt { // The following is a required static library initialization hook. // This way, the backend gets registered into the global store just by getting selectively // compiled in (i.e. without insertion into the general code or plugin loading). // The back-end name is taken from the one provided in the BackEnd constructor. static bool __initialize_mqtt_backend = BackEndLibrary::get()->register_backend(MQTTBackEnd::get()); // MQTTBackEnd{{{ // The backend class is the interface to the rest of the IPAACA library, // which does not know any of the implementation details here. // It is available via its (unique) given name (here "mqtt", just below) MQTTBackEnd::MQTTBackEnd() : BackEnd("mqtt") { } BackEnd::ptr MQTTBackEnd::get() { static ptr backend_singleton; if (!backend_singleton) { mosqpp::lib_init(); backend_singleton = std::shared_ptr<MQTTBackEnd>(new MQTTBackEnd()); } return backend_singleton; } Informer::ptr MQTTBackEnd::createInformer(const std::string& scope) { auto res = std::make_shared<MQTTInformer>(generate_client_id(), scope, get_global_config()); res->wait_live(); // TODO wait for it to come live? return res; } Listener::ptr MQTTBackEnd::createListener(const std::string& scope, InputBuffer* buf) { auto res = std::make_shared<MQTTListener>(generate_client_id(), scope, buf, get_global_config()); res->wait_live(); // TODO wait for it to come live? return res; } LocalServer::ptr MQTTBackEnd::createLocalServer(const std::string& scope, OutputBuffer* buf) { auto res = std::make_shared<MQTTLocalServer>(generate_client_id(), scope, buf, get_global_config()); res->wait_live(); // TODO wait for it to come live? return res; } RemoteServer::ptr MQTTBackEnd::createRemoteServer(const std::string& scope) { auto res = std::make_shared<MQTTRemoteServer>(generate_client_id(), scope, get_global_config()); //res->wait_live(); // TODO wait for it to come live? return res; } //}}} // // Internal implementation follows // // ParticipantCore{{{ ParticipantCore::ParticipantCore() : _running(false), _live(false) { } void ParticipantCore::signal_live() { IPAACA_DEBUG("Notifying to wake up an async MQTT session (now live)") _live = true; _condvar.notify_one(); } bool ParticipantCore::wait_live(long timeout_milliseconds) { IPAACA_DEBUG("Waiting for an MQTT session to come live") std::unique_lock<std::mutex> lock(_condvar_mutex); // mqtt handlers will notify this after connect or subscribe (depending on the subclass) auto success = _condvar.wait_for(lock, std::chrono::milliseconds(timeout_milliseconds), [this]{return this->_live;}); if (!success) { IPAACA_ERROR("Backend timeout: failed to go live") return false; // TODO throw here or in construction wrapper (below) } return true; } //}}} // MQTTParticipant{{{ int MQTTParticipant::get_next_mid() { static int _curmid = 0; _curmid++; return _curmid; } MQTTParticipant::MQTTParticipant(const std::string& client_id, const std::string& scope, Config::ptr config) : ParticipantCore(), mosqpp::mosquittopp(client_id.c_str(), true), _scope(scope) { //threaded_set(true); _client_id = client_id; // get connection parameters from config if (config) { host = config->get_with_default_and_warning<std::string>("transport.mqtt.host", "localhost"); port = config->get_with_default_and_warning<int>("transport.mqtt.port", 1883); keepalive = config->get_with_default<int>("transport.mqtt.keepalive", 60); } else { host = "localhost"; port = 1883; keepalive = 60; IPAACA_ERROR("No Config provided in MQTT backend, using defaults: host=localhost port=1883 keepalive=60") } IPAACA_DEBUG("Created MQTTParticipant on " << host << ":" << port << " for scope " << _scope << " with prepared client id " << _client_id) } IPAACA_EXPORT void MQTTParticipant::connect_and_background() { static const char* mosquitto_reasons[] = { //"MOSQ_ERR_CONN_PENDING" is -1 "MOSQ_ERR_SUCCESS", "MOSQ_ERR_NOMEM", "MOSQ_ERR_PROTOCOL", "MOSQ_ERR_INVAL", "MOSQ_ERR_NO_CONN", "MOSQ_ERR_CONN_REFUSED", "MOSQ_ERR_NOT_FOUND", "MOSQ_ERR_CONN_LOST", "MOSQ_ERR_TLS", "MOSQ_ERR_PAYLOAD_SIZE", "MOSQ_ERR_NOT_SUPPORTED", "MOSQ_ERR_AUTH", "MOSQ_ERR_ACL_DENIED", "MOSQ_ERR_UNKNOWN", "MOSQ_ERR_ERRNO", "MOSQ_ERR_EAI", "MOSQ_ERR_PROXY" }; int res = connect(host.c_str(), port, keepalive); loop_start(); if (res!=0) { const char* reason = "unknown"; if (res==-1) { reason = "MOSQ_ERR_CONN_PENDING"; } else if ((res>0) && (res<17)) { reason = mosquitto_reasons[res]; } IPAACA_ERROR("MQTT connect (for scope " << _scope << ") returned an error " << res << " (mosquitto's " << reason << ") - please double-check parameters") if (res==14) { IPAACA_ERROR("The underlying system error: errno " << errno << " (" << strerror(errno) << ")") } throw BackEndConnectionFailedError(); } else { IPAACA_DEBUG("connect OK for scope " << _scope) } } void MQTTParticipant::on_error() { IPAACA_ERROR("MQTT error") } void MQTTParticipant::on_disconnect(int rc) { IPAACA_ERROR("MQTT disconnect of " << _scope << " with rc " << rc) } //}}} // MQTTInformer {{{ MQTTInformer::MQTTInformer(const std::string& client_id, const std::string& scope, Config::ptr config) : MQTTParticipant(client_id, scope, config) { IPAACA_DEBUG("Create MQTTInformer for scope " << ((std::string) scope)) connect_and_background(); } void MQTTInformer::on_connect(int rc) { signal_live(); } IPAACA_EXPORT bool MQTTInformer::internal_publish(const std::string& wire) { IPAACA_DEBUG("Trying to publish via MQTT, topic " << _scope) //int mid = MQTTParticipant::get_next_mid(); int result = mosquittopp::publish(NULL, _scope.c_str(), wire.size(), wire.c_str(), 2, false); } //}}} // MQTTListener {{{ MQTTListener::MQTTListener(const std::string& client_id, const std::string& scope, InputBuffer* buffer_ptr, Config::ptr config) : MQTTParticipant(client_id, scope, config), Listener(buffer_ptr) { IPAACA_DEBUG("Create MQTTListener for scope " << ((std::string) scope)) connect_and_background(); } void MQTTListener::on_connect(int rc) { IPAACA_WARNING("Listener::on_connect for the one on scope " << _scope) //subscribe(&mid, _scope.c_str(), 2); //int mid = MQTTParticipant::get_next_mid(); int res = subscribe(NULL, _scope.c_str(), 2); if (res!=0) { IPAACA_ERROR("subscribe (on topic " << _scope << ") returned an error " << res) } else { IPAACA_DEBUG("subscribe returned OK for topic " << _scope) } } void MQTTListener::on_subscribe(int mid, int qos_count, const int * granted_qos) { IPAACA_DEBUG("on_subscribe: " << mid << " " << qos_count << " for scope " << _scope) if (qos_count < 1) { IPAACA_WARNING("No QoS grants reported") } else if (qos_count > 1) { IPAACA_WARNING("More than one QoS grant reported for Listener, should not happen") } else { int qos = granted_qos[0]; if (qos!=2) { IPAACA_WARNING("MQTT QoS level 2 (guaranteed delivery) has NOT been granted on " << _scope << " (we got level " << qos << ")") } } signal_live(); } IPAACA_EXPORT void MQTTListener::on_message(const struct mosquitto_message * message) { // internal_deserialize expects a string, which we construct here from the received char* and len auto event = ipaaca::converters::internal_deserialize(std::string((const char*) message->payload, message->payloadlen)); //std::cout << "GOT AN EVENT of type " << event->getType() << std::endl; // let the Listener base class handle the propagation into a Buffer: Listener::relay_received_event_to_buffer(event); } //}}} // MQTTLocalServer {{{ MQTTLocalServer::MQTTLocalServer(const std::string& client_id, const std::string& scope, ipaaca::OutputBuffer* buffer_ptr, Config::ptr config) : MQTTParticipant(client_id, scope, config), LocalServer(buffer_ptr) { std::cout << "CREATE MQTTLocalServer for scope " << ((std::string) scope) << std::endl; connect_and_background(); } void MQTTLocalServer::on_connect(int rc) { //IPAACA_DEBUG("LocalServer::on_connect: " << rc) int res = subscribe(NULL, _scope.c_str(), 2); if (res!=0) { IPAACA_ERROR("subscribe (on topic " << _scope << ") returned an error " << res) } else { IPAACA_DEBUG("subscribe returned OK for topic " << _scope) } } void MQTTLocalServer::on_subscribe(int mid, int qos_count, const int * granted_qos) { //IPAACA_DEBUG("LocalServer::on_subscribe: " << mid << " " << qos_count << " for scope " << _scope) if (qos_count < 1) { IPAACA_WARNING("No QoS grants reported") } else if (qos_count > 1) { IPAACA_WARNING("More than one QoS grant reported for Listener, should not happen") } else { int qos = granted_qos[0]; if (qos!=2) { IPAACA_WARNING("MQTT QoS level 2 (guaranteed delivery) has NOT been granted on " << _scope << " (we got level " << qos << ")") } } signal_live(); } void MQTTLocalServer::send_result_for_request(const std::string& request_endpoint, const std::string& request_uid, int64_t result) { std::string wire; std::shared_ptr<protobuf::RemoteRequestResult> pbo(new protobuf::RemoteRequestResult()); pbo->set_request_uid(request_uid); pbo->set_result(result); wire = ipaaca::converters::internal_serialize(pbo); IPAACA_DEBUG("Trying to send result to RemoteServer " << request_endpoint) int send_res = mosquittopp::publish(NULL, request_endpoint.c_str(), wire.size(), wire.c_str(), 2, false); } void MQTTLocalServer::on_message(const struct mosquitto_message * message) { std::cout << "LocServ " << _scope << ": Got a message" << std::endl; auto event = ipaaca::converters::internal_deserialize(std::string((const char*) message->payload, message->payloadlen)); auto type = event->getType(); int64_t result = 0; std::string request_endpoint(""); std::string request_uid(""); if (type == "ipaaca::IUPayloadUpdate") { auto obj = std::static_pointer_cast<ipaaca::IUPayloadUpdate>(event->getData()); request_uid = obj->request_uid; request_endpoint = obj->request_endpoint; result = LocalServer::attempt_to_apply_remote_payload_update(obj); } else if (type == "ipaaca::IULinkUpdate") { auto obj = std::static_pointer_cast<ipaaca::IULinkUpdate>(event->getData()); request_uid = obj->request_uid; result = LocalServer::attempt_to_apply_remote_link_update(obj); } else if (type == "ipaaca::protobuf::IUCommission") { auto obj = std::static_pointer_cast<ipaaca::protobuf::IUCommission>(event->getData()); request_uid = obj->request_uid(); result = LocalServer::attempt_to_apply_remote_commission(obj); } else if (type == "ipaaca::protobuf::IUResendRequest") { auto obj = std::static_pointer_cast<ipaaca::protobuf::IUResendRequest>(event->getData()); request_uid = obj->request_uid(); result = LocalServer::attempt_to_apply_remote_resend_request(obj); } else { IPAACA_ERROR("MQTTLocalServer: unhandled request wire type " << type) } if (request_uid.length()) { // TODO send reply send_result_for_request(request_endpoint, request_uid, result); } else { IPAACA_ERROR("MQTTLocalServer: cannot reply since request_uid is unknown") } } //}}} // MQTTRemoteServer{{{ // RemoteServer (= the side that sends requests to the owner of a remote IU) MQTTRemoteServer::MQTTRemoteServer(const std::string& client_id, const std::string& scope, Config::ptr config) : MQTTParticipant(client_id, scope, config) { std::cout << "CREATE MQTTRemoteServer for scope " << ((std::string) scope) << std::endl; } IPAACA_EXPORT int64_t MQTTRemoteServer::request_remote_payload_update(std::shared_ptr<IUPayloadUpdate> update) { IPAACA_WARNING("Implement me") return 0; } IPAACA_EXPORT int64_t MQTTRemoteServer::request_remote_link_update(std::shared_ptr<IULinkUpdate> update) { IPAACA_WARNING("Implement me") return 0; } IPAACA_EXPORT int64_t MQTTRemoteServer::request_remote_commission(std::shared_ptr<protobuf::IUCommission> update) { IPAACA_WARNING("Implement me") return 0; } IPAACA_EXPORT int64_t MQTTRemoteServer::request_remote_resend_request(std::shared_ptr<protobuf::IUResendRequest> update) { IPAACA_WARNING("Implement me") return 0; } //}}} } // of namespace mqtt } // of namespace backend } // of namespace ipaaca