Skip to content
Snippets Groups Projects
Commit dc12fe31 authored by Ramin Yaghoubzadeh Torky's avatar Ramin Yaghoubzadeh Torky
Browse files

Fix for massive MQTT dispatcher bug.

Unlocked thread race could lead to quick event sequences
causing inconsistent payloads! Fixed by serializing via extra
per-thread queue (... should have always been so; mea culpa)
parent e18e6d7f
No related branches found
No related tags found
No related merge requests found
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
# "Incremental Processing Architecture # "Incremental Processing Architecture
# for Artificial Conversational Agents". # for Artificial Conversational Agents".
# #
# Copyright (c) 2009-2019 Social Cognitive Systems Group # Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University # CITEC, Bielefeld University
# #
# http://opensource.cit-ec.de/projects/ipaaca/ # http://opensource.cit-ec.de/projects/ipaaca/
...@@ -67,6 +67,9 @@ else: ...@@ -67,6 +67,9 @@ else:
LOGGER = ipaaca.misc.get_library_logger() LOGGER = ipaaca.misc.get_library_logger()
_REMOTE_SERVER_MAX_QUEUED_REQUESTS = -1 # unlimited
_REMOTE_LISTENER_MAX_QUEUED_EVENTS = 1024 # 'Full' exception if exceeded
class EventWrapper(object): class EventWrapper(object):
def __init__(self, data): def __init__(self, data):
self.data = data self.data = data
...@@ -136,14 +139,22 @@ else: ...@@ -136,14 +139,22 @@ else:
self._mqtt_client.publish(self._scope, ipaaca.converter.serialize(data), qos=2) self._mqtt_client.publish(self._scope, ipaaca.converter.serialize(data), qos=2)
class BackgroundEventDispatcher(threading.Thread): class BackgroundEventDispatcher(threading.Thread):
def __init__(self, event, handlers): def __init__(self, listener):
super(BackgroundEventDispatcher, self).__init__() super(BackgroundEventDispatcher, self).__init__()
self.daemon = True self.daemon = True
self._event = event self._listener = listener
self._handlers = handlers def terminate(self):
self._running = False
def run(self): def run(self):
for handler in self._handlers: self._running = True
handler(self._event) listener = self._listener
while self._running: # auto-terminated (daemon)
event = listener._event_queue.get(block=True, timeout=None)
if event is None: return # signaled termination
#print('\033[31mDispatch '+str(event.data.__class__.__name__)+' start ...\033[m')
for handler in self._listener._handlers:
handler(event)
#print('\033[32m... dispatch '+str(event.data.__class__.__name__)+' end.\033[m')
class Listener(object): class Listener(object):
'''Listener interface, wrapping an inbound port from MQTT''' '''Listener interface, wrapping an inbound port from MQTT'''
...@@ -153,6 +164,7 @@ else: ...@@ -153,6 +164,7 @@ else:
self._live = False self._live = False
self._live_event = threading.Event() self._live_event = threading.Event()
self._handlers = [] self._handlers = []
self._event_queue = queue.Queue(_REMOTE_LISTENER_MAX_QUEUED_EVENTS)
# #
self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8]) self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8])
self._client_id += '_' + scope self._client_id += '_' + scope
...@@ -167,10 +179,14 @@ else: ...@@ -167,10 +179,14 @@ else:
#self._mqtt_client.on_socket_close = self.mqtt_callback_on_socket_close #self._mqtt_client.on_socket_close = self.mqtt_callback_on_socket_close
#self._mqtt_client.on_log = self.mqtt_callback_on_log #self._mqtt_client.on_log = self.mqtt_callback_on_log
#self._mqtt_client.on_publish = self.mqtt_callback_on_publish #self._mqtt_client.on_publish = self.mqtt_callback_on_publish
self._dispatcher = BackgroundEventDispatcher(self)
self._dispatcher.start()
self.run_in_background() self.run_in_background()
def deactivate(self): def deactivate(self):
pass pass
def deactivate_internal(self): def deactivate_internal(self):
self._event_queue.put(None, block=False) # signal termination, waking queue
self._dispatcher.terminate()
self._mqtt_client.disconnect() self._mqtt_client.disconnect()
self._mqtt_client = None self._mqtt_client = None
def run_in_background(self): def run_in_background(self):
...@@ -196,9 +212,7 @@ else: ...@@ -196,9 +212,7 @@ else:
LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc)) LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc))
def mqtt_callback_on_message(self, client, userdata, message): def mqtt_callback_on_message(self, client, userdata, message):
event = EventWrapper(ipaaca.converter.deserialize(message.payload)) event = EventWrapper(ipaaca.converter.deserialize(message.payload))
#print('Received: '+str(event.data.__class__.__name__)) self._event_queue.put(event, block=False) # queue event for BackgroundEventDispatcher
dispatcher = BackgroundEventDispatcher(event, self._handlers)
dispatcher.start()
def addHandler(self, handler): def addHandler(self, handler):
self._handlers.append(handler) self._handlers.append(handler)
#def publishData(self, data): #def publishData(self, data):
...@@ -284,8 +298,6 @@ else: ...@@ -284,8 +298,6 @@ else:
return self._buffer._remote_request_resend(obj) return self._buffer._remote_request_resend(obj)
_REMOTE_SERVER_MAX_QUEUED_REQUESTS = -1 # unlimited
class RemoteServer(object): class RemoteServer(object):
'''RemoteServer, connects to a LocalServer on the side '''RemoteServer, connects to a LocalServer on the side
of an actual IU owner, which will process any requests. of an actual IU owner, which will process any requests.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment