Skip to content
Snippets Groups Projects
Commit 4e41fb25 authored by Ramin Yaghoubzadeh Torky's avatar Ramin Yaghoubzadeh Torky
Browse files

Fixed timesync and notifier; precursor for clean component orchestration

parent dc12fe31
No related branches found
No related tags found
No related merge requests found
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
# "Incremental Processing Architecture # "Incremental Processing Architecture
# for Artificial Conversational Agents". # for Artificial Conversational Agents".
# #
# Copyright (c) 2009-2014 Social Cognitive Systems Group # Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University # CITEC, Bielefeld University
# #
# http://opensource.cit-ec.de/projects/ipaaca/ # http://opensource.cit-ec.de/projects/ipaaca/
...@@ -33,9 +33,9 @@ ...@@ -33,9 +33,9 @@
from __future__ import division, print_function from __future__ import division, print_function
import os
import threading import threading
import ipaaca.buffer import ipaaca.buffer
import ipaaca.iu import ipaaca.iu
import ipaaca.misc import ipaaca.misc
...@@ -64,11 +64,16 @@ class ComponentError(Exception): ...@@ -64,11 +64,16 @@ class ComponentError(Exception):
class ComponentNotifier(object): class ComponentNotifier(object):
NOTIFY_CATEGORY = "componentNotify" NOTIFY_CATEGORY = "componentNotify"
CONTROL_CATEGORY = "componentControl"
SEND_CATEGORIES = "send_categories" SEND_CATEGORIES = "send_categories"
RECEIVE_CATEGORIES = "recv_categories" RECEIVE_CATEGORIES = "recv_categories"
CMD = "cmd"
STATE = "state" STATE = "state"
NAME = "name" NAME = "name"
WHO = "who" # list of names (or empty)
FUNCTION = "function" FUNCTION = "function"
PID = "pid"
CMD_REPORT = "report"
def __init__(self, component_name, component_function, send_categories, receive_categories, out_buffer=None, in_buffer=None): def __init__(self, component_name, component_function, send_categories, receive_categories, out_buffer=None, in_buffer=None):
self.component_name = component_name self.component_name = component_name
...@@ -116,14 +121,23 @@ class ComponentNotifier(object): ...@@ -116,14 +121,23 @@ class ComponentNotifier(object):
self.out_buffer.add(notify_iu) self.out_buffer.add(notify_iu)
def _handle_iu_event(self, iu, event_type, local): def _handle_iu_event(self, iu, event_type, local):
if iu.payload[ComponentNotifier.NAME] == self.component_name: if iu.category == ComponentNotifier.NOTIFY_CATEGORY:
return if iu.payload[ComponentNotifier.NAME] == self.component_name:
with self.notification_handler_lock: return
for h in self.notification_handlers: with self.notification_handler_lock:
h(iu, event_type, local) for h in self.notification_handlers:
if iu.payload[ComponentNotifier.STATE] == "new": h(iu, event_type, local)
#print("submitting") if iu.payload[ComponentNotifier.STATE] == "new":
self._submit_notify(False) #print("submitting")
self._submit_notify(False)
elif iu.category == ComponentNotifier.CONTROL_CATEGORY:
cmd = iu.payload[ComponentNotifier.CMD]
if cmd=='report':
# Request to report (by component controller)
who = iu.payload[ComponentNotifier.WHO]
# If we are named specifically or it's a broadcast
if len(who)==0 or self.component_name in who:
self._submit_notify(False)
def add_notification_handler(self, handler): def add_notification_handler(self, handler):
with self.notification_handler_lock: with self.notification_handler_lock:
...@@ -155,12 +169,14 @@ class ComponentNotifier(object): ...@@ -155,12 +169,14 @@ class ComponentNotifier(object):
if (not self.initialized): if (not self.initialized):
self.timesync_slave = ipaaca.util.timesync.TimesyncSlave(component_name=self.component_name, timing_handler=self.launch_timesync_slave_handlers) self.timesync_slave = ipaaca.util.timesync.TimesyncSlave(component_name=self.component_name, timing_handler=self.launch_timesync_slave_handlers)
self.timesync_master = ipaaca.util.timesync.TimesyncMaster(component_name=self.component_name, timing_handler=self.launch_timesync_master_handlers) self.timesync_master = ipaaca.util.timesync.TimesyncMaster(component_name=self.component_name, timing_handler=self.launch_timesync_master_handlers)
self.in_buffer.register_handler(self._handle_iu_event, ipaaca.iu.IUEventType.MESSAGE, ComponentNotifier.NOTIFY_CATEGORY) self.in_buffer.register_handler(self._handle_iu_event, ipaaca.iu.IUEventType.MESSAGE, [ComponentNotifier.NOTIFY_CATEGORY, ComponentNotifier.CONTROL_CATEGORY])
self._submit_notify(True) self._submit_notify(True)
self.initialized = True self.initialized = True
def __enter__(self): def __enter__(self):
self.initialize() self.initialize()
return self
def __exit__(self, t, v, tb): def __exit__(self, t, v, tb):
self.terminate() self.terminate()
return self
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
# "Incremental Processing Architecture # "Incremental Processing Architecture
# for Artificial Conversational Agents". # for Artificial Conversational Agents".
# #
# Copyright (c) 2009-2014 Social Cognitive Systems Group # Copyright (c) 2009-2022 Social Cognitive Systems Group
# CITEC, Bielefeld University # CITEC, Bielefeld University
# #
# http://opensource.cit-ec.de/projects/ipaaca/ # http://opensource.cit-ec.de/projects/ipaaca/
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
from __future__ import division, print_function from __future__ import division, print_function
import threading
import time import time
import ipaaca.buffer import ipaaca.buffer
...@@ -45,7 +46,7 @@ class TimesyncMaster(object): ...@@ -45,7 +46,7 @@ class TimesyncMaster(object):
# component name to report (None => use buffer name) # component name to report (None => use buffer name)
self.component_name = component_name if component_name is not None else self.ob.unique_name self.component_name = component_name if component_name is not None else self.ob.unique_name
# #
self.ob.register_handler(self.handle_timesync_master) #self.ob.register_handler(self.handle_timesync_master)
self.ib.register_handler(self.handle_timesync_master) self.ib.register_handler(self.handle_timesync_master)
# master_t1 is identical for all slaves # master_t1 is identical for all slaves
self.master_t1 = None self.master_t1 = None
...@@ -63,7 +64,7 @@ class TimesyncMaster(object): ...@@ -63,7 +64,7 @@ class TimesyncMaster(object):
self.timing_handler = timing_handler self.timing_handler = timing_handler
def send_master_timesync(self): def send_master_timesync(self):
iu = ipaaca.iu.IU('timesyncRequest') iu = ipaaca.iu.Message('timesyncRequest')
self.master_t1 = self.get_time() self.master_t1 = self.get_time()
iu.payload = { iu.payload = {
'stage':'0', 'stage':'0',
...@@ -71,47 +72,48 @@ class TimesyncMaster(object): ...@@ -71,47 +72,48 @@ class TimesyncMaster(object):
'master':self.component_name, 'master':self.component_name,
} }
self.ob.add(iu) self.ob.add(iu)
print('Sent a stage 0 timesync as master '+self.component_name)
def handle_timesync_master(self, iu, event_type, own): def handle_timesync_master(self, iu, event_type, own):
master = iu.payload['master'] master = iu.payload['master']
if not own and master == self.component_name: if not own and master == self.component_name:
if self.component_name == master: if event_type == ipaaca.IUEventType.ADDED or event_type == ipaaca.IUEventType.UPDATED:
# reply to our own initial IU if self.component_name == master:
slave = iu.payload['slave'] # reply to our own initial IU
stage = iu.payload['stage'] slave = iu.payload['slave']
if stage=='1': stage = iu.payload['stage']
print('Received stage 1 from slave '+slave) if stage=='1':
# initial reply by slave # initial reply by slave
t1 = iu.payload['slave_t1'] t1 = iu.payload['slave_t1']
self.slave_t1s[slave] = float(t1) self.slave_t1s[slave] = float(t1)
t2 = self.master_t2s[slave] = self.get_time() t2 = self.master_t2s[slave] = self.get_time()
iu.payload.merge({'master_t2': str(t2), 'stage':'2'}) iu.payload.merge({'master_t2': str(t2), 'stage':'2'})
latency1 = t2 - self.master_t1 latency1 = t2 - self.master_t1
self.latencies[slave] = latency1 #print('Before stage 1 for '+slave+': '+str(self.latencies))
#print('Latency of round-trip 1: %.3f' % latency1) self.latencies[slave] = latency1
elif stage=='3': #print('After stage 1 for '+slave+': '+str(self.latencies))
print('Received stage 3 from slave '+slave) #print('Latency of round-trip 1: %.3f' % latency1)
# second reply by slave elif stage=='3':
t2 = iu.payload['slave_t2'] #print('At stage 3 for '+slave+': '+str(self.latencies))
self.slave_t2s[slave] = float(t2) # second reply by slave
t_final = self.get_time() t2 = iu.payload['slave_t2']
latency1 = self.latencies[slave] self.slave_t2s[slave] = float(t2)
latency2 = t_final - self.master_t2s[slave] t_final = self.get_time()
latency = self.latencies[slave] = (latency1+latency2)/2.0 latency1 = self.latencies[slave]
offset1 = (self.slave_t1s[slave]-self.master_t1)-latency/2.0 latency2 = t_final - self.master_t2s[slave]
offset2 = (self.slave_t2s[slave]-self.master_t2s[slave])-latency/2.0 latency = self.latencies[slave] = (latency1+latency2)/2.0
offset = (offset1+offset2)/2.0 offset1 = (self.slave_t1s[slave]-self.master_t1)-latency/2.0
iu.payload.merge({'stage':'4', 'latency': str(latency), 'offset':str(offset)}) offset2 = (self.slave_t2s[slave]-self.master_t2s[slave])-latency/2.0
if self.timing_handler is None: offset = (offset1+offset2)/2.0
print('Determined timing of timesync slave '+slave) iu.payload.merge({'stage':'4', 'latency': str(latency), 'offset':str(offset)})
print(' Avg round-trip latency: %.3f s'%latency) if self.timing_handler is None:
print(' Offset of their clock: %.3f s'%offset) print('Determined timing of timesync slave '+slave)
print(' Avg round-trip latency: %.3f s'%latency)
print(' Offset of their clock: %.3f s'%offset)
else:
self.timing_handler(self.component_name, slave, latency, offset)
else: else:
self.timing_handler(self.component_name, slave, latency, offset) # other stages are handled by time slave handler
else: pass
# other stages are handled by time slave handler
pass
def get_time(self): def get_time(self):
return time.time() + self.debug_offset return time.time() + self.debug_offset
...@@ -129,7 +131,6 @@ class TimesyncSlave(object): ...@@ -129,7 +131,6 @@ class TimesyncSlave(object):
#self.master_t2 = None #self.master_t2 = None
#self.master = None #self.master = None
self.latency = None self.latency = None
self.my_iu = None
# #
self.debug_offset = debug_offset self.debug_offset = debug_offset
# #
...@@ -142,30 +143,23 @@ class TimesyncSlave(object): ...@@ -142,30 +143,23 @@ class TimesyncSlave(object):
master = iu.payload['master'] master = iu.payload['master']
stage = iu.payload['stage'] stage = iu.payload['stage']
if self.component_name != master: if self.component_name != master:
if not own: if not own and stage=='0':
# reply only to IUs from others # reply only to IUs from others
if stage=='0': #print('Received stage 0 from master '+master)
#print('Received stage 0 from master '+master) # initial reply to master
# initial reply to master myiu = ipaaca.iu.IU('timesyncReply')
self.my_iu = ipaaca.iu.IU('timesyncReply') # TODO: add grounded_in link too?
# TODO: add grounded_in link too? t1 = self.get_time()
t1 = self.get_time() myiu.payload = iu.payload
self.my_iu.payload = iu.payload myiu.payload['slave'] = self.component_name
self.my_iu.payload['slave'] = self.component_name myiu.payload['slave_t1'] = str(t1)
self.my_iu.payload['slave_t1'] = str(t1) myiu.payload['stage'] = '1'
self.my_iu.payload['stage'] = '1' self.ob.add(myiu)
elif iu.payload['slave'] == self.component_name:
#self.my_iu.payload.merge({
# 'slave':self.component_name,
# 'slave_t1':str(t1),
# 'stage':'1',
# })
self.ob.add(self.my_iu)
else:
if stage=='2': if stage=='2':
#print('Received stage 2 from master '+master) #print('Received stage 2 from master '+master)
t2 = self.get_time() t2 = self.get_time()
self.my_iu.payload.merge({ iu.payload.merge({
'slave_t2':str(t2), 'slave_t2':str(t2),
'stage':'3', 'stage':'3',
}) })
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment