Skip to content
Snippets Groups Projects
Commit 2260e7ac authored by Ramin Yaghoubzadeh Torky's avatar Ramin Yaghoubzadeh Torky
Browse files

C++: provided additional event dispatcher (threaded as in Python)

parent a13f057a
No related branches found
No related tags found
No related merge requests found
...@@ -184,6 +184,7 @@ class Listener { ...@@ -184,6 +184,7 @@ class Listener {
IPAACA_HEADER_EXPORT inline virtual ~Listener() { } IPAACA_HEADER_EXPORT inline virtual ~Listener() { }
//inline Listener(const Scope& scope, InputBuffer* buffer_ptr, Config::ptr config = nullptr)) {} //inline Listener(const Scope& scope, InputBuffer* buffer_ptr, Config::ptr config = nullptr)) {}
void relay_received_event_to_buffer(Event::ptr event); void relay_received_event_to_buffer(Event::ptr event);
void relay_received_event_to_buffer_threaded(Event::ptr event);
}; };
class LocalServer { class LocalServer {
......
...@@ -263,7 +263,7 @@ IPAACA_EXPORT void MQTTListener::on_message(const struct mosquitto_message * mes ...@@ -263,7 +263,7 @@ IPAACA_EXPORT void MQTTListener::on_message(const struct mosquitto_message * mes
auto event = ipaaca::converters::internal_deserialize(std::string((const char*) message->payload, message->payloadlen)); auto event = ipaaca::converters::internal_deserialize(std::string((const char*) message->payload, message->payloadlen));
//std::cout << "GOT AN EVENT of type " << event->getType() << std::endl; //std::cout << "GOT AN EVENT of type " << event->getType() << std::endl;
// let the Listener base class handle the propagation into a Buffer: // let the Listener base class handle the propagation into a Buffer:
Listener::relay_received_event_to_buffer(event); Listener::relay_received_event_to_buffer_threaded(event);
} }
//}}} //}}}
......
...@@ -179,6 +179,15 @@ IPAACA_EXPORT void Listener::relay_received_event_to_buffer(Event::ptr event) ...@@ -179,6 +179,15 @@ IPAACA_EXPORT void Listener::relay_received_event_to_buffer(Event::ptr event)
//std::cout << "Will relay it" << std::endl; //std::cout << "Will relay it" << std::endl;
_buffer->_handle_iu_events(event); _buffer->_handle_iu_events(event);
} }
IPAACA_EXPORT void Listener::relay_received_event_to_buffer_threaded(Event::ptr event)
{
auto buffer = _buffer; // avoid a 'this' lambda capture
std::thread dispatcher(
[buffer,event] () {
buffer->_handle_iu_events(event);
});
dispatcher.detach();
}
} // of namespace backend } // of namespace backend
......
...@@ -90,7 +90,17 @@ class TesterCpp { ...@@ -90,7 +90,17 @@ class TesterCpp {
std::cout << "}" << std::endl; std::cout << "}" << std::endl;
if (etype == IU_ADDED) { if (etype == IU_ADDED) {
std::cout << "Will send a modification to a received new IU" << std::endl; std::cout << "Will send a modification to a received new IU" << std::endl;
iu->payload()["seen_by_cpp"] = true; int cnt=5;
while(cnt>0) {
try {
iu->payload()["seen_by_cpp"] = true;
cnt = 0;
} catch(...) {
std::cout << "... failed, but we try more than once ..." << std::endl;
cnt--;
usleep(20000);
}
}
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment