/* * This file is part of IPAACA, the * "Incremental Processing Architecture * for Artificial Conversational Agents". * * Copyright (c) 2009-2019 Social Cognitive Systems Group * (formerly the Sociable Agents Group) * CITEC, Bielefeld University * * http://opensource.cit-ec.de/projects/ipaaca/ * http://purl.org/net/ipaaca * * This file may be licensed under the terms of of the * GNU Lesser General Public License Version 3 (the ``LGPL''), * or (at your option) any later version. * * Software distributed under the License is distributed * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either * express or implied. See the LGPL for the specific language * governing rights and limitations. * * You should have received a copy of the LGPL along with this * program. If not, go to http://www.gnu.org/licenses/lgpl.html * or write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * * The development of this software was supported by the * Excellence Cluster EXC 277 Cognitive Interaction Technology. * The Excellence Cluster EXC 277 is a grant of the Deutsche * Forschungsgemeinschaft (DFG) in the context of the German * Excellence Initiative. */ /** * \file ipaaca-backend-ros.cc * * \brief Source file for the ROS backend * * \author Ramin Yaghoubzadeh Torky (ryaghoubzadeh@uni-bielefeld.de) * \date January, 2019 */ #include <ipaaca/ipaaca.h> #include "b64/b64.h" namespace ipaaca { #include <ipaaca/ipaaca-backend-ros.h> namespace backend { namespace ros { // The following is a required static library initialization hook. // This way, the backend gets registered into the global store just by getting selectively // compiled in (i.e. without insertion into the general code or plugin loading). // The back-end name is taken from the one provided in the BackEnd constructor. IPAACA_EXPORT static bool __initialize_ros_backend = BackEndLibrary::get()->register_backend(ROSBackEnd::get()); // ROSBackEnd{{{ // The backend class is the interface to the rest of the IPAACA library, // which does not know any of the implementation details here. // It is available via its (unique) given name (here "ros", just below) IPAACA_EXPORT ROSBackEnd::ROSBackEnd() : BackEnd("ros"), _need_init(true), _node_handle(NULL) { } void sigint_ros_shutdown(int signal) { std::cout << "SIGINT" << std::endl; ::ros::shutdown(); exit(0); } IPAACA_EXPORT void ROSBackEnd::init_once() { if (_need_init) { IPAACA_INFO("Initializing ROS back-end ...") const char* fakename = "ipaaca_cpp_bin"; _cfakename = (char*) malloc(16); strncpy(_cfakename, fakename, 15); char* fake_argv[] = { _cfakename, NULL }; int fake_argc = 1; int num_spinner_threads = 4; _need_init = false; IPAACA_INFO("Initializing ROS node ...") ::ros::init(fake_argc, fake_argv, fakename, ::ros::init_options::AnonymousName); // | ::ros::init_options::NoRosout // IPAACA_INFO("Starting ROS node ...") _node_handle = new ::ros::NodeHandle(); // internally calls ::ros::start() // signal(SIGINT, sigint_ros_shutdown); // IPAACA_INFO("Starting ROS spinner thread ...") _spinner = new ::ros::AsyncSpinner(num_spinner_threads); _spinner->start(); } } IPAACA_EXPORT void ROSBackEnd::teardown() { IPAACA_INFO("Stopping ROS spinner thread ...") _spinner->stop(); delete _spinner; _spinner = NULL; IPAACA_INFO("Shutting down ROS node ...") ::ros::shutdown(); delete _node_handle; _node_handle = NULL; free(_cfakename); } IPAACA_EXPORT BackEnd::ptr ROSBackEnd::get() { static ptr backend_singleton; if (!backend_singleton) { backend_singleton = std::shared_ptr<ROSBackEnd>(new ROSBackEnd()); } return backend_singleton; } IPAACA_EXPORT Informer::ptr ROSBackEnd::createInformer(const std::string& scope) { init_once(); auto res = std::make_shared<ROSInformer>(_node_handle, scope, get_global_config()); return res; } IPAACA_EXPORT Listener::ptr ROSBackEnd::createListener(const std::string& scope, InputBuffer* buf) { init_once(); auto res = std::make_shared<ROSListener>(_node_handle, scope, buf, get_global_config()); return res; } IPAACA_EXPORT LocalServer::ptr ROSBackEnd::createLocalServer(const std::string& scope, OutputBuffer* buf) { init_once(); auto res = std::make_shared<ROSLocalServer>(_node_handle, scope, buf, get_global_config()); return res; } IPAACA_EXPORT RemoteServer::ptr ROSBackEnd::createRemoteServer(const std::string& scope) { init_once(); auto res = std::make_shared<ROSRemoteServer>(_node_handle, scope, get_global_config()); return res; } //}}} // // Internal implementation follows // // ParticipantCore{{{ IPAACA_EXPORT ParticipantCore::ParticipantCore() : _running(false), _live(false) { } IPAACA_EXPORT void ParticipantCore::signal_live() { IPAACA_DEBUG("Notifying to wake up an async ROS session (now live)") _live = true; _condvar.notify_one(); } IPAACA_EXPORT bool ParticipantCore::wait_live(long timeout_milliseconds) { IPAACA_DEBUG("Waiting for an ROS session to come live") std::unique_lock<std::mutex> lock(_condvar_mutex); // ros handlers will notify this after connect or subscribe (depending on the subclass) auto success = _condvar.wait_for(lock, std::chrono::milliseconds(timeout_milliseconds), [this]{return this->_live;}); if (!success) { IPAACA_ERROR("Backend timeout: failed to go live") return false; // TODO throw here or in construction wrapper (below) } return true; } //}}} // ROSParticipant{{{ IPAACA_EXPORT ROSParticipant::ROSParticipant(::ros::NodeHandle* node, const std::string& scope, Config::ptr config) : ParticipantCore(), _node_handle(node), _scope(scope) { //_client_id = client_id; // get connection parameters from config if (config) { host = config->get_with_default_and_warning<std::string>("transport.ros.host", "localhost"); port = config->get_with_default_and_warning<int>("transport.ros.port", 1883); keepalive = config->get_with_default<int>("transport.ros.keepalive", 60); } else { host = "localhost"; port = 1883; keepalive = 60; IPAACA_ERROR("No Config provided in ROS backend, using defaults: host=localhost port=1883 keepalive=60") } //IPAACA_DEBUG("Created ROSParticipant on " << host << ":" << port << " for scope " << _scope) // << " with prepared client id " << _client_id) //IPAACA_DEBUG("Created ROSParticipant for scope " << _scope) // << " with prepared client id " << _client_id) } //}}} // ROSInformer {{{ IPAACA_EXPORT ROSInformer::ROSInformer(::ros::NodeHandle* node, const std::string& scope, Config::ptr config) : ROSParticipant(node, scope, config) { IPAACA_DEBUG("Create ROS Publisher for scope " << ((std::string) scope)) _ros_pub = node->advertise<std_msgs::String>(scope, 100, true); // latch == true //connect_and_background(); } IPAACA_EXPORT bool ROSInformer::internal_publish(const std::string& wire) { IPAACA_DEBUG("Trying to publish via ROS, topic " << _scope) //int mid = ROSParticipant::get_next_mid(); std_msgs::String msg; msg.data = base64_encode(wire); // // This would be a way to ensure that an event has at least // // one actual recipient (e.g. if you know there must be a receiver // // by convention and wait to avoid lost events due to connection delay) //while (_ros_pub.getNumSubscribers() == 0) { // std::this_thread().sleep_for(std::chrono::milliseconds(10)); //} _ros_pub.publish(msg); IPAACA_DEBUG("... returned from publish.") return true; } //}}} // ROSListener {{{ IPAACA_EXPORT ROSListener::ROSListener(::ros::NodeHandle* node, const std::string& scope, InputBuffer* buffer_ptr, Config::ptr config) : ROSParticipant(node, scope, config), Listener(buffer_ptr) { IPAACA_DEBUG("Create ROSListener for scope " << ((std::string) scope)) _ros_sub = node->subscribe(scope, 1000, &ROSListener::on_message, this, ::ros::TransportHints().tcpNoDelay().reliable().unreliable()); } IPAACA_EXPORT void ROSListener::on_message(const std_msgs::String::ConstPtr& msg) { // internal_deserialize expects a string, which we construct here from the received char* and len auto message = base64_decode(msg->data); auto event = ipaaca::converters::internal_deserialize(message); std::cout << "GOT AN EVENT of type " << event->getType() << std::endl; // let the Listener base class handle the propagation into a Buffer: Listener::relay_received_event_to_buffer_threaded(event); } //}}} // ROSLocalServer {{{ IPAACA_EXPORT ROSLocalServer::ROSLocalServer(::ros::NodeHandle* node, const std::string& scope, ipaaca::OutputBuffer* buffer_ptr, Config::ptr config) : ROSParticipant(node, scope, config), LocalServer(buffer_ptr) { IPAACA_DEBUG("Create ROSLocalServer for scope " << ((std::string) scope)); _ros_sub = node->subscribe(scope, 1000, &ROSLocalServer::on_message, this, ::ros::TransportHints().tcpNoDelay().reliable().unreliable()); } // int res = subscribe(NULL, _scope.c_str(), 2); IPAACA_EXPORT ::ros::Publisher ROSLocalServer::get_publisher(const std::string& endpoint) { if (_ros_pubs.count(endpoint)) return _ros_pubs[endpoint]; _ros_pubs[endpoint] = _node_handle->advertise<std_msgs::String>(endpoint, 100, true); // latch == true return _ros_pubs[endpoint]; } IPAACA_EXPORT void ROSLocalServer::send_result_for_request(const std::string& request_endpoint, const std::string& request_uid, int64_t result) { std::string wire; std::shared_ptr<protobuf::RemoteRequestResult> pbo(new protobuf::RemoteRequestResult()); pbo->set_request_uid(request_uid); pbo->set_result(result); wire = ipaaca::converters::internal_serialize(pbo); IPAACA_DEBUG("Trying to send result to RemoteServer " << request_endpoint) std_msgs::String msg; msg.data = base64_encode(wire); auto pub = get_publisher(request_endpoint); // // if latching should be insufficient for reliable RPC, activate this: //while (pub.getNumSubscribers() == 0) { // std::this_thread().sleep_for(std::chrono::milliseconds(10)); //} pub.publish(msg); } IPAACA_EXPORT void ROSLocalServer::on_message(const std_msgs::String::ConstPtr& msg) { auto message = base64_decode(msg->data); auto event = ipaaca::converters::internal_deserialize(message); auto type = event->getType(); IPAACA_DEBUG("LocalServer " << _scope << " got an object of type " << type) int64_t result = 0; std::string request_endpoint(""); std::string request_uid(""); if (type == "ipaaca::IUPayloadUpdate") { auto obj = std::static_pointer_cast<ipaaca::IUPayloadUpdate>(event->getData()); request_uid = obj->request_uid; request_endpoint = obj->request_endpoint; result = LocalServer::attempt_to_apply_remote_payload_update(obj); } else if (type == "ipaaca::IULinkUpdate") { auto obj = std::static_pointer_cast<ipaaca::IULinkUpdate>(event->getData()); request_uid = obj->request_uid; result = LocalServer::attempt_to_apply_remote_link_update(obj); } else if (type == "ipaaca::protobuf::IUCommission") { auto obj = std::static_pointer_cast<ipaaca::protobuf::IUCommission>(event->getData()); request_uid = obj->request_uid(); result = LocalServer::attempt_to_apply_remote_commission(obj); } else if (type == "ipaaca::protobuf::IUResendRequest") { auto obj = std::static_pointer_cast<ipaaca::protobuf::IUResendRequest>(event->getData()); request_uid = obj->request_uid(); result = LocalServer::attempt_to_apply_remote_resend_request(obj); } else { IPAACA_ERROR("ROSLocalServer: unhandled request wire type " << type) } if (request_uid.length()) { send_result_for_request(request_endpoint, request_uid, result); } else { IPAACA_ERROR("ROSLocalServer: cannot reply since request_uid is unknown") } } //}}} // ROSRemoteServer{{{ // RemoteServer (= the side that sends requests to the owner of a remote IU) IPAACA_EXPORT ROSRemoteServer::ROSRemoteServer(::ros::NodeHandle* node, const std::string& scope, Config::ptr config) : ROSParticipant(node, scope, config) { _remote_end_scope = _scope; auto uuid = ipaaca::generate_uuid_string().substr(0,8); _name = "LocalServer_" + uuid; // TODO add e.g. pid as in Python _scope = "/ipaaca/remotes/" + _name; // overwrites constructed ROSParticipant::_scope here (!) IPAACA_DEBUG("Create ROSRemoteServer for remote scope " << ((std::string) _remote_end_scope) << " and reply listener on " << _scope) _ros_sub = node->subscribe(_scope, 1000, &ROSRemoteServer::on_message, this, ::ros::TransportHints().tcpNoDelay().reliable().unreliable()); _ros_pub = node->advertise<std_msgs::String>(_remote_end_scope, 100, true); // latch == true } IPAACA_EXPORT int64_t ROSRemoteServer::request_remote_payload_update(std::shared_ptr<IUPayloadUpdate> update) { return blocking_call(std::make_shared<Event>("ipaaca::IUPayloadUpdate", update)); } IPAACA_EXPORT int64_t ROSRemoteServer::request_remote_link_update(std::shared_ptr<IULinkUpdate> update) { return blocking_call(std::make_shared<Event>("ipaaca::IULinkUpdate", update)); } IPAACA_EXPORT int64_t ROSRemoteServer::request_remote_commission(std::shared_ptr<protobuf::IUCommission> update) { return blocking_call(std::make_shared<Event>("ipaaca::protobuf::IUCommission", update)); } IPAACA_EXPORT int64_t ROSRemoteServer::request_remote_resend_request(std::shared_ptr<protobuf::IUResendRequest> update) { return blocking_call(std::make_shared<Event>("ipaaca::protobuf::IUResendRequest", update)); } IPAACA_EXPORT void ROSRemoteServer::on_message(const std_msgs::String::ConstPtr& msg) { auto message = base64_decode(msg->data); auto event = ipaaca::converters::internal_deserialize(message); auto type = event->getType(); IPAACA_DEBUG("RemoteServer " << _scope << " for remote " << _remote_end_scope << " got an object of type " << type) if (type == "ipaaca::protobuf::RemoteRequestResult") { auto reply = std::static_pointer_cast<ipaaca::protobuf::RemoteRequestResult>(event->getData()); auto uid = reply->request_uid(); auto result = reply->result(); PendingRequest::ptr pending_request; { ipaaca::Locker locker(_pending_requests_lock); auto it = _pending_requests.find(uid); if (it != _pending_requests.end()) { pending_request = it->second; _pending_requests.erase(it); } } if (pending_request) { pending_request->reply_with_result(result); } else { IPAACA_ERROR("ROSRemoteServer: got a reply for a request that is not pending: " << uid) } } else { IPAACA_ERROR("ROSRemoteServer: unhandled request wire type " << type) } } IPAACA_EXPORT PendingRequest::ptr ROSRemoteServer::queue_pending_request(Event::ptr request) { PendingRequest::ptr pending_request = std::make_shared<PendingRequest>(request); { ipaaca::Locker locker(_pending_requests_lock); if ((_ROS_REMOTE_SERVER_MAX_QUEUED_REQUESTS > 0) && (_pending_requests.size() >= _ROS_REMOTE_SERVER_MAX_QUEUED_REQUESTS)) { IPAACA_ERROR("ROSRemoteServer: maximum number of pending requests exceeded") throw BackEndBadConditionError(); } else { _pending_requests[pending_request->_request_uid] = pending_request; return pending_request; } } } IPAACA_EXPORT int64_t ROSRemoteServer::blocking_call(Event::ptr request) { std::string wire; auto pending_request = queue_pending_request(request); if (request->getType() == "ipaaca::IUPayloadUpdate") { auto obj = std::static_pointer_cast<ipaaca::IUPayloadUpdate>(request->getData()); obj->request_uid = pending_request->_request_uid; obj->request_endpoint = _scope; wire = ipaaca::converters::internal_serialize(obj); } else if (request->getType() == "ipaaca::IULinkUpdate") { auto obj = std::static_pointer_cast<ipaaca::IULinkUpdate>(request->getData()); obj->request_uid = pending_request->_request_uid; obj->request_endpoint = _scope; wire = ipaaca::converters::internal_serialize(obj); } else if (request->getType() == "ipaaca::protobuf::IUCommission") { auto obj = std::static_pointer_cast<ipaaca::protobuf::IUCommission>(request->getData()); obj->set_request_uid(pending_request->_request_uid); obj->set_request_endpoint(_scope); wire = ipaaca::converters::internal_serialize(obj); } else if (request->getType() == "ipaaca::protobuf::IUResendRequest") { auto obj = std::static_pointer_cast<ipaaca::protobuf::IUResendRequest>(request->getData()); obj->set_request_uid(pending_request->_request_uid); obj->set_request_endpoint(_scope); wire = ipaaca::converters::internal_serialize(obj); } else { IPAACA_ERROR("Unhandled request type " << request->getType()) throw BackEndBadConditionError(); } IPAACA_DEBUG("Trying to send request to remote LocalServer at " << _remote_end_scope) std_msgs::String msg; msg.data = base64_encode(wire); _ros_pub.publish(msg); IPAACA_DEBUG("Waiting for the remote server") auto result = pending_request->wait_for_reply(); IPAACA_DEBUG("RPC wait ended.") if (result<0) { IPAACA_WARNING("A RemoteServer request timed out, remote end was " << _remote_end_scope) return 0; } else { return result; } } //}}} } // of namespace ros } // of namespace backend } // of namespace ipaaca