diff --git a/ipaacalib/python/src/ipaaca/__init__.py b/ipaacalib/python/src/ipaaca/__init__.py index 41c3748d02464ff9ff774149b34aa8e30fe4273c..db2f873b9ca7f6405ba6ba4ba8a27b61f0a3f29d 100755 --- a/ipaacalib/python/src/ipaaca/__init__.py +++ b/ipaacalib/python/src/ipaaca/__init__.py @@ -1413,10 +1413,10 @@ class OutputBuffer(Buffer): with iu.revision_lock: if (update.revision != 0) and (update.revision != iu.revision): # (0 means "do not pay attention to the revision number" -> "force update") - logger.warning("Remote update_payload operation failed because request was out of date; IU "+str(update.uid)) - logger.warning(" Writer was: "+update.writer_name) - logger.warning(" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove)) - logger.warning(" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision)) + logger.warning(u"Remote update_payload operation failed because request was out of date; IU "+str(update.uid)) + logger.warning(u" Writer was: "+update.writer_name) + logger.warning(u" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove)) + logger.warning(u" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision)) return 0 if update.is_delta: #print('Writing delta update by '+str(update.writer_name)) diff --git a/ipaacalib/python/src/ipaaca/util/notifier.py b/ipaacalib/python/src/ipaaca/util/notifier.py index 438ad760ed935c803fafde9a5fb6ddd427799006..1ee991e833af8a776b8c040ac8fc2d68e687b7b0 100644 --- a/ipaacalib/python/src/ipaaca/util/notifier.py +++ b/ipaacalib/python/src/ipaaca/util/notifier.py @@ -35,6 +35,7 @@ from __future__ import print_function, with_statement import threading import ipaaca +from ipaaca.util.timesync import * NotificationState = ipaaca.enum( NEW = 'new', @@ -68,6 +69,11 @@ class ComponentNotifier(object): self.initialize_lock = threading.Lock() self.notification_handler_lock = threading.Lock() self.submit_lock = threading.Lock() + # clock sync code, sync slave/master pair will be installed when launched + self.timesync_slave = None + self.timesync_master = None + self.timesync_master_handlers = [] + self.timesync_slave_handlers = [] def _submit_notify(self, is_new): with self.submit_lock: @@ -109,11 +115,32 @@ class ComponentNotifier(object): with self.notification_handler_lock: self.notification_handlers.append(handler) + def launch_timesync_slave_handlers(self, master, slave, latency, offset): + for h in self.timesync_slave_handlers: + h(master, slave, latency, offset) + + def launch_timesync_master_handlers(self, master, slave, latency, offset): + for h in self.timesync_master_handlers: + h(master, slave, latency, offset) + + def add_timesync_slave_handler(self, handler): + self.timesync_slave_handlers.append(handler) + + def add_timesync_master_handler(self, handler): + self.timesync_master_handlers.append(handler) + + def send_master_timesync(self): + #if len(self.timesync_master_handlers)==0: + # print('Warning: Sending a master timesync without a registered result callback.') + self.timesync_master.send_master_timesync() + def initialize(self): with self.initialize_lock: if self.terminated: raise ComponentError('Attempted to reinitialize component '+component_name+' after termination') if (not self.initialized): + self.timesync_slave = TimesyncSlave(component_name=self.component_name, timing_handler=self.launch_timesync_slave_handlers) + self.timesync_master = TimesyncMaster(component_name=self.component_name, timing_handler=self.launch_timesync_master_handlers) self.in_buffer.register_handler(self._handle_iu_event, ipaaca.IUEventType.MESSAGE, ComponentNotifier.NOTIFY_CATEGORY) self._submit_notify(True) self.initialized = True diff --git a/ipaacalib/python/src/ipaaca/util/timesync.py b/ipaacalib/python/src/ipaaca/util/timesync.py new file mode 100644 index 0000000000000000000000000000000000000000..e3ec33e58398d37eb2b714c8625bbd3af22d1107 --- /dev/null +++ b/ipaacalib/python/src/ipaaca/util/timesync.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import ipaaca +import time + +class TimesyncMaster(object): + def __init__(self, component_name=None, timing_handler=None, debug_offset=0): + self.ob = ipaaca.OutputBuffer(('' if component_name is None else component_name)+'TimesyncMaster') + self.ib = ipaaca.InputBuffer(('' if component_name is None else component_name)+'TimesyncMaster', ['timesyncReply']) + # component name to report (None => use buffer name) + self.component_name = component_name if component_name is not None else self.ob.unique_name + # + self.ob.register_handler(self.handle_timesync_master) + self.ib.register_handler(self.handle_timesync_master) + # master_t1 is identical for all slaves + self.master_t1 = None + self.slave_t1s = {} + self.master_t2s = {} + self.slave_t2s = {} + self.latencies = {} + self.time_offsets = {} + # + self.debug_offset = debug_offset + # + self.timing_handler = timing_handler + + def set_timing_handler(self, timing_handler): + self.timing_handler = timing_handler + + def send_master_timesync(self): + iu = ipaaca.IU('timesyncRequest') + self.master_t1 = self.get_time() + iu.payload = { + 'stage':'0', + 'master_t1':str(self.master_t1), + 'master':self.component_name, + } + self.ob.add(iu) + print('Sent a stage 0 timesync as master '+self.component_name) + + def handle_timesync_master(self, iu, event_type, own): + master = iu.payload['master'] + if not own and master == self.component_name: + if self.component_name == master: + # reply to our own initial IU + slave = iu.payload['slave'] + stage = iu.payload['stage'] + if stage=='1': + print('Received stage 1 from slave '+slave) + # initial reply by slave + t1 = iu.payload['slave_t1'] + self.slave_t1s[slave] = float(t1) + t2 = self.master_t2s[slave] = self.get_time() + iu.payload.merge({'master_t2': str(t2), 'stage':'2'}) + latency1 = t2 - self.master_t1 + self.latencies[slave] = latency1 + #print('Latency of round-trip 1: %.3f' % latency1) + elif stage=='3': + print('Received stage 3 from slave '+slave) + # second reply by slave + t2 = iu.payload['slave_t2'] + self.slave_t2s[slave] = float(t2) + t_final = self.get_time() + latency1 = self.latencies[slave] + latency2 = t_final - self.master_t2s[slave] + latency = self.latencies[slave] = (latency1+latency2)/2.0 + offset1 = (self.slave_t1s[slave]-self.master_t1)-latency/2.0 + offset2 = (self.slave_t2s[slave]-self.master_t2s[slave])-latency/2.0 + offset = (offset1+offset2)/2.0 + iu.payload.merge({'stage':'4', 'latency': str(latency), 'offset':str(offset)}) + if self.timing_handler is None: + print('Determined timing of timesync slave '+slave) + print(' Avg round-trip latency: %.3f s'%latency) + print(' Offset of their clock: %.3f s'%offset) + else: + self.timing_handler(self.component_name, slave, latency, offset) + else: + # other stages are handled by time slave handler + pass + + def get_time(self): + return time.time() + self.debug_offset + + +class TimesyncSlave(object): + def __init__(self, component_name=None, timing_handler=None, debug_offset=0): + self.ob = ipaaca.OutputBuffer(('' if component_name is None else component_name)+'TimesyncSlave') + self.ib = ipaaca.InputBuffer(('' if component_name is None else component_name)+'TimesyncSlave', ['timesyncRequest']) + # component name to report (None => use buffer name) + self.component_name = component_name if component_name is not None else self.ib.unique_name + self.ob.register_handler(self.handle_timesync_slave) + self.ib.register_handler(self.handle_timesync_slave) + #self.master_t1 = None + #self.master_t2 = None + #self.master = None + self.latency = None + self.my_iu = None + # + self.debug_offset = debug_offset + # + self.timing_handler = timing_handler + + def set_timing_handler(self, timing_handler): + self.timing_handler = timing_handler + + def handle_timesync_slave(self, iu, event_type, own): + master = iu.payload['master'] + stage = iu.payload['stage'] + if self.component_name != master: + if not own: + # reply only to IUs from others + if stage=='0': + #print('Received stage 0 from master '+master) + # initial reply to master + self.my_iu = ipaaca.IU('timesyncReply') + # TODO: add grounded_in link too? + t1 = self.get_time() + self.my_iu.payload = iu.payload + self.my_iu.payload['slave'] = self.component_name + self.my_iu.payload['slave_t1'] = str(t1) + self.my_iu.payload['stage'] = '1' + + #self.my_iu.payload.merge({ + # 'slave':self.component_name, + # 'slave_t1':str(t1), + # 'stage':'1', + # }) + self.ob.add(self.my_iu) + else: + if stage=='2': + #print('Received stage 2 from master '+master) + t2 = self.get_time() + self.my_iu.payload.merge({ + 'slave_t2':str(t2), + 'stage':'3', + }) + elif stage=='4': + latency = float(iu.payload['latency']) + offset = float(iu.payload['offset']) + if self.timing_handler is None: + print('Timesync master '+master+' determined our timing: ') + print(' Avg round-trip latency: %.3f s'%latency) + print(' Offset of our clock: %.3f s'%offset) + else: + self.timing_handler(master, self.component_name, latency, offset) + def get_time(self): + return time.time() + self.debug_offset + + diff --git a/ipaacatools/scripts/ipaaca-iu-sniffer.py b/ipaacatools/scripts/ipaaca-iu-sniffer.py index 9d9d1ba6ed61a9a5fd27b02f3b33a65bdbcaa5cc..c7034c1d7a49e872946efd13848bf271f0e9fbe0 100755 --- a/ipaacatools/scripts/ipaaca-iu-sniffer.py +++ b/ipaacatools/scripts/ipaaca-iu-sniffer.py @@ -33,23 +33,70 @@ import logging import sys import time - import ipaaca +color = False +max_size = 2048 + +def highlight_if_color(s, c='1'): + return s if not color else '['+c+'m'+s+'[m' + +def pretty_printed_iu_payload(iu): + s='{ ' + for k,v in iu.payload.items(): + v2 = (('\''+v+'\'') if len(v)<=max_size else ('\''+v[:max_size]+'\'<excess length omitted>')).replace('\\','\\\\').replace('\n',highlight_if_color('\\n')) + s += '\n' + '\t\t\'' + highlight_if_color(unicode(k),'1')+'\': '+unicode(v2)+', ' + s+=' }' + return s + +def event_type_color(typ): + colors={'ADDED':'32;1', 'RETRACTED':'31;1', 'UPDATED':'33;1', 'MESSAGE':'34;1'} + return '1' if typ not in colors else colors[typ] + +def pretty_printed_iu_event(iu, event_type, local): + s='' + t=time.time() + lt=time.localtime(t) + s += highlight_if_color('%.3f'%t, '1')+' '+"%02d:%02d:%02d"%(lt.tm_hour, lt.tm_min, lt.tm_sec) + s += ' '+highlight_if_color('%-9s'%event_type,event_type_color(event_type))+' category='+highlight_if_color(iu.category,event_type_color(event_type))+' uid='+iu.uid+' owner='+iu.owner_name+' payload='+pretty_printed_iu_payload(iu) + return s + def my_update_handler(iu, event_type, local): t=time.localtime() - print "%02d:%02d:%02d"%(t.tm_hour, t.tm_min,t.tm_sec), - print time.time(), - print(event_type+': '+unicode(iu)) + print pretty_printed_iu_event(iu, event_type, local) cats = [] -if len(sys.argv)>1: - cats = sys.argv[1:] + +keep_going=True +idx = 1 +while keep_going: + opt = sys.argv[idx] if idx<len(sys.argv) else None + if opt=='--help': + print('IU sniffer - usage:') + print(' '+sys.argv[0]+' [--options] [<category1> [<category2 ...]]') + print(' Listen to specified categories (default: all)') + print(' Option --color : colorize output') + print(' Option --size-limit <size> : limit payload display, chars (def: 2048)') + sys.exit(0) + elif opt=='--color': + color = True + idx += 1 + elif opt=='--size-limit': + if len(sys.argv)<idx+2: + print('Please specify a max size') + sys.exit(1) + max_size = int(sys.argv[idx+1]) + idx += 2 + else: + cats = sys.argv[idx:] + keep_going = False ib = ipaaca.InputBuffer('SnifferIn', [''] if len(cats)==0 else cats) ib.register_handler(my_update_handler) -print("Listening for IU events of "+("any category..." if len(cats)==0 else "categories: "+' '.join(cats))) +print('') +print('Ipaaca IU Sniffer - run with --help to see options') +print('Listening for IU events of '+('any category...' if len(cats)==0 else 'categories: '+' '.join(cats))) print('') while True: time.sleep(1)