diff --git a/ipaacalib/python/src/ipaaca/backend_mqtt.py b/ipaacalib/python/src/ipaaca/backend_mqtt.py index b79411d76ae7e67123a046fe1037c5edc57e44c4..b22f5f05090bcc1be0276748d7ee6b86fbe75cf5 100644 --- a/ipaacalib/python/src/ipaaca/backend_mqtt.py +++ b/ipaacalib/python/src/ipaaca/backend_mqtt.py @@ -4,7 +4,7 @@ # "Incremental Processing Architecture # for Artificial Conversational Agents". # -# Copyright (c) 2009-2019 Social Cognitive Systems Group +# Copyright (c) 2009-2022 Social Cognitive Systems Group # CITEC, Bielefeld University # # http://opensource.cit-ec.de/projects/ipaaca/ @@ -67,6 +67,9 @@ else: LOGGER = ipaaca.misc.get_library_logger() + _REMOTE_SERVER_MAX_QUEUED_REQUESTS = -1 # unlimited + _REMOTE_LISTENER_MAX_QUEUED_EVENTS = 1024 # 'Full' exception if exceeded + class EventWrapper(object): def __init__(self, data): self.data = data @@ -136,14 +139,22 @@ else: self._mqtt_client.publish(self._scope, ipaaca.converter.serialize(data), qos=2) class BackgroundEventDispatcher(threading.Thread): - def __init__(self, event, handlers): + def __init__(self, listener): super(BackgroundEventDispatcher, self).__init__() self.daemon = True - self._event = event - self._handlers = handlers + self._listener = listener + def terminate(self): + self._running = False def run(self): - for handler in self._handlers: - handler(self._event) + self._running = True + listener = self._listener + while self._running: # auto-terminated (daemon) + event = listener._event_queue.get(block=True, timeout=None) + if event is None: return # signaled termination + #print('\033[31mDispatch '+str(event.data.__class__.__name__)+' start ...\033[m') + for handler in self._listener._handlers: + handler(event) + #print('\033[32m... dispatch '+str(event.data.__class__.__name__)+' end.\033[m') class Listener(object): '''Listener interface, wrapping an inbound port from MQTT''' @@ -153,6 +164,7 @@ else: self._live = False self._live_event = threading.Event() self._handlers = [] + self._event_queue = queue.Queue(_REMOTE_LISTENER_MAX_QUEUED_EVENTS) # self._client_id = '%s.%s_%s'%(self.__module__, self.__class__.__name__, str(uuid.uuid4())[0:8]) self._client_id += '_' + scope @@ -167,10 +179,14 @@ else: #self._mqtt_client.on_socket_close = self.mqtt_callback_on_socket_close #self._mqtt_client.on_log = self.mqtt_callback_on_log #self._mqtt_client.on_publish = self.mqtt_callback_on_publish + self._dispatcher = BackgroundEventDispatcher(self) + self._dispatcher.start() self.run_in_background() def deactivate(self): pass def deactivate_internal(self): + self._event_queue.put(None, block=False) # signal termination, waking queue + self._dispatcher.terminate() self._mqtt_client.disconnect() self._mqtt_client = None def run_in_background(self): @@ -196,9 +212,7 @@ else: LOGGER.warning('MQTT disconnect for '+str(self._scope)+' with result code '+str(rc)) def mqtt_callback_on_message(self, client, userdata, message): event = EventWrapper(ipaaca.converter.deserialize(message.payload)) - #print('Received: '+str(event.data.__class__.__name__)) - dispatcher = BackgroundEventDispatcher(event, self._handlers) - dispatcher.start() + self._event_queue.put(event, block=False) # queue event for BackgroundEventDispatcher def addHandler(self, handler): self._handlers.append(handler) #def publishData(self, data): @@ -284,8 +298,6 @@ else: return self._buffer._remote_request_resend(obj) - _REMOTE_SERVER_MAX_QUEUED_REQUESTS = -1 # unlimited - class RemoteServer(object): '''RemoteServer, connects to a LocalServer on the side of an actual IU owner, which will process any requests.