diff --git a/ipaacalib/python/src/ipaaca/util/notifier.py b/ipaacalib/python/src/ipaaca/util/notifier.py index 750d39affd5b657cf6c761d2a919d96fbbda9f8c..ec66000ae7f8633cfd4173fd80e735017f12b153 100644 --- a/ipaacalib/python/src/ipaaca/util/notifier.py +++ b/ipaacalib/python/src/ipaaca/util/notifier.py @@ -4,7 +4,7 @@ # "Incremental Processing Architecture # for Artificial Conversational Agents". # -# Copyright (c) 2009-2014 Social Cognitive Systems Group +# Copyright (c) 2009-2022 Social Cognitive Systems Group # CITEC, Bielefeld University # # http://opensource.cit-ec.de/projects/ipaaca/ @@ -33,9 +33,9 @@ from __future__ import division, print_function +import os import threading - import ipaaca.buffer import ipaaca.iu import ipaaca.misc @@ -64,11 +64,16 @@ class ComponentError(Exception): class ComponentNotifier(object): NOTIFY_CATEGORY = "componentNotify" + CONTROL_CATEGORY = "componentControl" SEND_CATEGORIES = "send_categories" RECEIVE_CATEGORIES = "recv_categories" + CMD = "cmd" STATE = "state" NAME = "name" + WHO = "who" # list of names (or empty) FUNCTION = "function" + PID = "pid" + CMD_REPORT = "report" def __init__(self, component_name, component_function, send_categories, receive_categories, out_buffer=None, in_buffer=None): self.component_name = component_name @@ -116,14 +121,23 @@ class ComponentNotifier(object): self.out_buffer.add(notify_iu) def _handle_iu_event(self, iu, event_type, local): - if iu.payload[ComponentNotifier.NAME] == self.component_name: - return - with self.notification_handler_lock: - for h in self.notification_handlers: - h(iu, event_type, local) - if iu.payload[ComponentNotifier.STATE] == "new": - #print("submitting") - self._submit_notify(False) + if iu.category == ComponentNotifier.NOTIFY_CATEGORY: + if iu.payload[ComponentNotifier.NAME] == self.component_name: + return + with self.notification_handler_lock: + for h in self.notification_handlers: + h(iu, event_type, local) + if iu.payload[ComponentNotifier.STATE] == "new": + #print("submitting") + self._submit_notify(False) + elif iu.category == ComponentNotifier.CONTROL_CATEGORY: + cmd = iu.payload[ComponentNotifier.CMD] + if cmd=='report': + # Request to report (by component controller) + who = iu.payload[ComponentNotifier.WHO] + # If we are named specifically or it's a broadcast + if len(who)==0 or self.component_name in who: + self._submit_notify(False) def add_notification_handler(self, handler): with self.notification_handler_lock: @@ -155,12 +169,14 @@ class ComponentNotifier(object): if (not self.initialized): self.timesync_slave = ipaaca.util.timesync.TimesyncSlave(component_name=self.component_name, timing_handler=self.launch_timesync_slave_handlers) self.timesync_master = ipaaca.util.timesync.TimesyncMaster(component_name=self.component_name, timing_handler=self.launch_timesync_master_handlers) - self.in_buffer.register_handler(self._handle_iu_event, ipaaca.iu.IUEventType.MESSAGE, ComponentNotifier.NOTIFY_CATEGORY) + self.in_buffer.register_handler(self._handle_iu_event, ipaaca.iu.IUEventType.MESSAGE, [ComponentNotifier.NOTIFY_CATEGORY, ComponentNotifier.CONTROL_CATEGORY]) self._submit_notify(True) self.initialized = True def __enter__(self): self.initialize() + return self def __exit__(self, t, v, tb): self.terminate() + return self diff --git a/ipaacalib/python/src/ipaaca/util/timesync.py b/ipaacalib/python/src/ipaaca/util/timesync.py index 950a50f1cce1929f89d6701d20b1d9dfe62db120..4a338f4fc37cee2e7af4bff7e21312844270e5d0 100644 --- a/ipaacalib/python/src/ipaaca/util/timesync.py +++ b/ipaacalib/python/src/ipaaca/util/timesync.py @@ -4,7 +4,7 @@ # "Incremental Processing Architecture # for Artificial Conversational Agents". # -# Copyright (c) 2009-2014 Social Cognitive Systems Group +# Copyright (c) 2009-2022 Social Cognitive Systems Group # CITEC, Bielefeld University # # http://opensource.cit-ec.de/projects/ipaaca/ @@ -32,6 +32,7 @@ from __future__ import division, print_function +import threading import time import ipaaca.buffer @@ -45,7 +46,7 @@ class TimesyncMaster(object): # component name to report (None => use buffer name) self.component_name = component_name if component_name is not None else self.ob.unique_name # - self.ob.register_handler(self.handle_timesync_master) + #self.ob.register_handler(self.handle_timesync_master) self.ib.register_handler(self.handle_timesync_master) # master_t1 is identical for all slaves self.master_t1 = None @@ -63,7 +64,7 @@ class TimesyncMaster(object): self.timing_handler = timing_handler def send_master_timesync(self): - iu = ipaaca.iu.IU('timesyncRequest') + iu = ipaaca.iu.Message('timesyncRequest') self.master_t1 = self.get_time() iu.payload = { 'stage':'0', @@ -71,47 +72,48 @@ class TimesyncMaster(object): 'master':self.component_name, } self.ob.add(iu) - print('Sent a stage 0 timesync as master '+self.component_name) def handle_timesync_master(self, iu, event_type, own): master = iu.payload['master'] if not own and master == self.component_name: - if self.component_name == master: - # reply to our own initial IU - slave = iu.payload['slave'] - stage = iu.payload['stage'] - if stage=='1': - print('Received stage 1 from slave '+slave) - # initial reply by slave - t1 = iu.payload['slave_t1'] - self.slave_t1s[slave] = float(t1) - t2 = self.master_t2s[slave] = self.get_time() - iu.payload.merge({'master_t2': str(t2), 'stage':'2'}) - latency1 = t2 - self.master_t1 - self.latencies[slave] = latency1 - #print('Latency of round-trip 1: %.3f' % latency1) - elif stage=='3': - print('Received stage 3 from slave '+slave) - # second reply by slave - t2 = iu.payload['slave_t2'] - self.slave_t2s[slave] = float(t2) - t_final = self.get_time() - latency1 = self.latencies[slave] - latency2 = t_final - self.master_t2s[slave] - latency = self.latencies[slave] = (latency1+latency2)/2.0 - offset1 = (self.slave_t1s[slave]-self.master_t1)-latency/2.0 - offset2 = (self.slave_t2s[slave]-self.master_t2s[slave])-latency/2.0 - offset = (offset1+offset2)/2.0 - iu.payload.merge({'stage':'4', 'latency': str(latency), 'offset':str(offset)}) - if self.timing_handler is None: - print('Determined timing of timesync slave '+slave) - print(' Avg round-trip latency: %.3f s'%latency) - print(' Offset of their clock: %.3f s'%offset) + if event_type == ipaaca.IUEventType.ADDED or event_type == ipaaca.IUEventType.UPDATED: + if self.component_name == master: + # reply to our own initial IU + slave = iu.payload['slave'] + stage = iu.payload['stage'] + if stage=='1': + # initial reply by slave + t1 = iu.payload['slave_t1'] + self.slave_t1s[slave] = float(t1) + t2 = self.master_t2s[slave] = self.get_time() + iu.payload.merge({'master_t2': str(t2), 'stage':'2'}) + latency1 = t2 - self.master_t1 + #print('Before stage 1 for '+slave+': '+str(self.latencies)) + self.latencies[slave] = latency1 + #print('After stage 1 for '+slave+': '+str(self.latencies)) + #print('Latency of round-trip 1: %.3f' % latency1) + elif stage=='3': + #print('At stage 3 for '+slave+': '+str(self.latencies)) + # second reply by slave + t2 = iu.payload['slave_t2'] + self.slave_t2s[slave] = float(t2) + t_final = self.get_time() + latency1 = self.latencies[slave] + latency2 = t_final - self.master_t2s[slave] + latency = self.latencies[slave] = (latency1+latency2)/2.0 + offset1 = (self.slave_t1s[slave]-self.master_t1)-latency/2.0 + offset2 = (self.slave_t2s[slave]-self.master_t2s[slave])-latency/2.0 + offset = (offset1+offset2)/2.0 + iu.payload.merge({'stage':'4', 'latency': str(latency), 'offset':str(offset)}) + if self.timing_handler is None: + print('Determined timing of timesync slave '+slave) + print(' Avg round-trip latency: %.3f s'%latency) + print(' Offset of their clock: %.3f s'%offset) + else: + self.timing_handler(self.component_name, slave, latency, offset) else: - self.timing_handler(self.component_name, slave, latency, offset) - else: - # other stages are handled by time slave handler - pass + # other stages are handled by time slave handler + pass def get_time(self): return time.time() + self.debug_offset @@ -129,7 +131,6 @@ class TimesyncSlave(object): #self.master_t2 = None #self.master = None self.latency = None - self.my_iu = None # self.debug_offset = debug_offset # @@ -142,30 +143,23 @@ class TimesyncSlave(object): master = iu.payload['master'] stage = iu.payload['stage'] if self.component_name != master: - if not own: + if not own and stage=='0': # reply only to IUs from others - if stage=='0': - #print('Received stage 0 from master '+master) - # initial reply to master - self.my_iu = ipaaca.iu.IU('timesyncReply') - # TODO: add grounded_in link too? - t1 = self.get_time() - self.my_iu.payload = iu.payload - self.my_iu.payload['slave'] = self.component_name - self.my_iu.payload['slave_t1'] = str(t1) - self.my_iu.payload['stage'] = '1' - - #self.my_iu.payload.merge({ - # 'slave':self.component_name, - # 'slave_t1':str(t1), - # 'stage':'1', - # }) - self.ob.add(self.my_iu) - else: + #print('Received stage 0 from master '+master) + # initial reply to master + myiu = ipaaca.iu.IU('timesyncReply') + # TODO: add grounded_in link too? + t1 = self.get_time() + myiu.payload = iu.payload + myiu.payload['slave'] = self.component_name + myiu.payload['slave_t1'] = str(t1) + myiu.payload['stage'] = '1' + self.ob.add(myiu) + elif iu.payload['slave'] == self.component_name: if stage=='2': #print('Received stage 2 from master '+master) t2 = self.get_time() - self.my_iu.payload.merge({ + iu.payload.merge({ 'slave_t2':str(t2), 'stage':'3', })