-
Ramin Yaghoubzadeh Torky authoredRamin Yaghoubzadeh Torky authored
ipaaca-backend-mqtt.cc 10.03 KiB
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2019 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
/**
* \file ipaaca-backend-mqtt.cc
*
* \brief Source file for the MQTT backend
*
* \author Ramin Yaghoubzadeh Torky (ryaghoubzadeh@uni-bielefeld.de)
* \date January, 2019
*/
#include <ipaaca/ipaaca.h>
namespace ipaaca {
#include <ipaaca/ipaaca-backend-mqtt.h>
namespace backend {
namespace mqtt {
// The following is a required static library initialization hook.
// This way, the backend gets registered into the global store just by getting selectively
// compiled in (i.e. without insertion into the general code or plugin loading).
// The back-end name is taken from the one provided in the BackEnd constructor.
static bool __initialize_mqtt_backend = BackEndLibrary::get()->register_backend(MQTTBackEnd::get());
// MQTTBackEnd{{{
// The backend class is the interface to the rest of the IPAACA library,
// which does not know any of the implementation details here.
// It is available via its (unique) given name (here "mqtt", just below)
MQTTBackEnd::MQTTBackEnd()
: BackEnd("mqtt") {
}
BackEnd::ptr MQTTBackEnd::get() {
static ptr backend_singleton;
if (!backend_singleton) {
mosqpp::lib_init();
backend_singleton = std::shared_ptr<MQTTBackEnd>(new MQTTBackEnd());
}
return backend_singleton;
}
Informer::ptr MQTTBackEnd::createInformer(const std::string& scope)
{
auto res = std::make_shared<MQTTInformer>(generate_client_id(), scope, get_global_config());
res->wait_live();
// TODO wait for it to come live?
return res;
}
Listener::ptr MQTTBackEnd::createListener(const std::string& scope, InputBuffer* buf)
{
auto res = std::make_shared<MQTTListener>(generate_client_id(), scope, buf, get_global_config());
res->wait_live();
// TODO wait for it to come live?
return res;
}
LocalServer::ptr MQTTBackEnd::createLocalServer(const std::string& scope, OutputBuffer* buf)
{
auto res = std::make_shared<MQTTLocalServer>(generate_client_id(), scope, buf, get_global_config());
//res->wait_live();
// TODO wait for it to come live?
return res;
}
RemoteServer::ptr MQTTBackEnd::createRemoteServer(const std::string& scope)
{
auto res = std::make_shared<MQTTRemoteServer>(generate_client_id(), scope, get_global_config());
//res->wait_live();
// TODO wait for it to come live?
return res;
}
//}}}
//
// Internal implementation follows
//
// ParticipantCore{{{
ParticipantCore::ParticipantCore()
: _running(false), _live(false)
{
}
void ParticipantCore::signal_live() {
IPAACA_DEBUG("Notifying to wake up an async MQTT session (now live)")
_live = true;
_condvar.notify_one();
}
bool ParticipantCore::wait_live(long timeout_milliseconds) {
IPAACA_DEBUG("Waiting for an MQTT session to come live")
std::unique_lock<std::mutex> lock(_condvar_mutex);
// mqtt handlers will notify this after connect or subscribe (depending on the subclass)
auto success = _condvar.wait_for(lock, std::chrono::milliseconds(timeout_milliseconds), [this]{return this->_live;});
if (!success) {
IPAACA_ERROR("Backend timeout: failed to go live")
return false; // TODO throw here or in construction wrapper (below)
}
return true;
}
//}}}
// MQTTParticipant{{{
int MQTTParticipant::get_next_mid() {
static int _curmid = 0;
_curmid++;
return _curmid;
}
MQTTParticipant::MQTTParticipant(const std::string& client_id, const std::string& scope, Config::ptr config)
: ParticipantCore(), mosqpp::mosquittopp(client_id.c_str(), true), _scope(scope)
{
//threaded_set(true);
_client_id = client_id;
// get connection parameters from config
if (config) {
host = config->get_with_default_and_warning<std::string>("transport.mqtt.host", "localhost");
port = config->get_with_default_and_warning<int>("transport.mqtt.port", 1883);
keepalive = config->get_with_default<int>("transport.mqtt.keepalive", 60);
} else {
host = "localhost";
port = 1883;
keepalive = 60;
IPAACA_ERROR("No Config provided in MQTT backend, using defaults: host=localhost port=1883 keepalive=60")
}
IPAACA_DEBUG("Created MQTTParticipant on " << host << ":" << port << " for scope " << _scope << " with prepared client id " << _client_id)
}
IPAACA_EXPORT void MQTTParticipant::connect_and_background()
{
int res = connect(host.c_str(), port, keepalive);
loop_start();
if (res!=0) {
IPAACA_ERROR("MQTT connect (on topic " << _scope << ") returned an error " << res << " - please double-check parameters")
if (res==14) {
IPAACA_ERROR("The underlying system error: errno " << errno << " (" << strerror(errno) << ")")
}
throw BackEndConnectionFailedError();
} else {
IPAACA_DEBUG("connect OK for scope " << _scope)
}
}
void MQTTParticipant::on_error()
{
IPAACA_ERROR("MQTT error")
}
void MQTTParticipant::on_disconnect(int rc)
{
IPAACA_ERROR("MQTT disconnect of " << _scope << " with rc " << rc)
}
//}}}
// MQTTInformer {{{
MQTTInformer::MQTTInformer(const std::string& client_id, const std::string& scope, Config::ptr config)
: MQTTParticipant(client_id, scope, config)
{
IPAACA_DEBUG("Create MQTTInformer for scope " << ((std::string) scope))
connect_and_background();
}
void MQTTInformer::on_connect(int rc)
{
signal_live();
}
IPAACA_EXPORT bool MQTTInformer::internal_publish(const std::string& wire)
{
IPAACA_DEBUG("Trying to publish via MQTT, topic " << _scope)
//int mid = MQTTParticipant::get_next_mid();
int result = mosquittopp::publish(NULL, _scope.c_str(), wire.size(), wire.c_str(), 2, false);
}
//}}}
// MQTTListener {{{
MQTTListener::MQTTListener(const std::string& client_id, const std::string& scope, InputBuffer* buffer_ptr, Config::ptr config)
: MQTTParticipant(client_id, scope, config), Listener(buffer_ptr)
{
IPAACA_DEBUG("Create MQTTListener for scope " << ((std::string) scope))
connect_and_background();
}
void MQTTListener::on_connect(int rc)
{
IPAACA_WARNING("Listener::on_connect for the one on scope " << _scope)
//subscribe(&mid, _scope.c_str(), 2);
//int mid = MQTTParticipant::get_next_mid();
int res = subscribe(NULL, _scope.c_str(), 2);
if (res!=0) {
IPAACA_ERROR("subscribe (on topic " << _scope << ") returned an error " << res)
} else {
IPAACA_DEBUG("subscribe returned OK for topic " << _scope)
}
}
void MQTTListener::on_subscribe(int mid, int qos_count, const int * granted_qos)
{
IPAACA_DEBUG("on_subscribe: " << mid << " " << qos_count << " for scope " << _scope)
if (qos_count < 1) {
IPAACA_WARNING("No QoS grants reported")
} else if (qos_count > 1) {
IPAACA_WARNING("More than one QoS grant reported for Listener, should not happen")
} else {
int qos = granted_qos[0];
if (qos!=2) {
IPAACA_WARNING("MQTT QoS level 2 (guaranteed delivery) has NOT been granted on " << _scope << " (we got level " << qos << ")")
}
}
signal_live();
}
IPAACA_EXPORT void MQTTListener::on_message(const struct mosquitto_message * message) {
// internal_deserialize expects a string, which we construct here from the received char* and len
auto event = ipaaca::converters::internal_deserialize(std::string((const char*) message->payload, message->payloadlen));
//std::cout << "GOT AN EVENT of type " << event->getType() << std::endl;
// let the Listener base class handle the propagation into a Buffer:
Listener::relay_received_event_to_buffer(event);
}
//}}}
// MQTTLocalServer {{{
MQTTLocalServer::MQTTLocalServer(const std::string& client_id, const std::string& scope, ipaaca::OutputBuffer* buffer_ptr, Config::ptr config)
: MQTTParticipant(client_id, scope, config), _buffer(buffer_ptr)
{
std::cout << "CREATE MQTTLocalServer for scope " << ((std::string) scope) << std::endl;
}
//}}}
// MQTTRemoteServer{{{
// RemoteServer (= the side that sends requests to the owner of a remote IU)
MQTTRemoteServer::MQTTRemoteServer(const std::string& client_id, const std::string& scope, Config::ptr config)
: MQTTParticipant(client_id, scope, config) {
std::cout << "CREATE MQTTRemoteServer for scope " << ((std::string) scope) << std::endl;
}
IPAACA_EXPORT int64_t MQTTRemoteServer::request_remote_payload_update(std::shared_ptr<IUPayloadUpdate> update)
{
IPAACA_WARNING("Implement me")
return 0;
}
IPAACA_EXPORT int64_t MQTTRemoteServer::request_remote_link_update(std::shared_ptr<IULinkUpdate> update)
{
IPAACA_WARNING("Implement me")
return 0;
}
IPAACA_EXPORT int64_t MQTTRemoteServer::request_remote_commission(std::shared_ptr<protobuf::IUCommission> update)
{
IPAACA_WARNING("Implement me")
return 0;
}
IPAACA_EXPORT int64_t MQTTRemoteServer::request_remote_resend_request(std::shared_ptr<protobuf::IUResendRequest> update)
{
IPAACA_WARNING("Implement me")
return 0;
}
//}}}
} // of namespace mqtt
} // of namespace backend
} // of namespace ipaaca