Skip to content
Snippets Groups Projects
Commit 873934e7 authored by hvanwelbergen's avatar hvanwelbergen
Browse files

Merge branch 'master' of portb.techfak.uni-bielefeld.de:/vol/soa/repositories/ipaaca

parents f74be66a ea037e20
No related branches found
No related tags found
No related merge requests found
......@@ -1413,10 +1413,10 @@ class OutputBuffer(Buffer):
with iu.revision_lock:
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote update_payload operation failed because request was out of date; IU "+str(update.uid))
logger.warning(" Writer was: "+update.writer_name)
logger.warning(" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove))
logger.warning(" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision))
logger.warning(u"Remote update_payload operation failed because request was out of date; IU "+str(update.uid))
logger.warning(u" Writer was: "+update.writer_name)
logger.warning(u" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove))
logger.warning(u" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision))
return 0
if update.is_delta:
#print('Writing delta update by '+str(update.writer_name))
......
......@@ -35,6 +35,7 @@ from __future__ import print_function, with_statement
import threading
import ipaaca
from ipaaca.util.timesync import *
NotificationState = ipaaca.enum(
NEW = 'new',
......@@ -68,6 +69,11 @@ class ComponentNotifier(object):
self.initialize_lock = threading.Lock()
self.notification_handler_lock = threading.Lock()
self.submit_lock = threading.Lock()
# clock sync code, sync slave/master pair will be installed when launched
self.timesync_slave = None
self.timesync_master = None
self.timesync_master_handlers = []
self.timesync_slave_handlers = []
def _submit_notify(self, is_new):
with self.submit_lock:
......@@ -109,11 +115,32 @@ class ComponentNotifier(object):
with self.notification_handler_lock:
self.notification_handlers.append(handler)
def launch_timesync_slave_handlers(self, master, slave, latency, offset):
for h in self.timesync_slave_handlers:
h(master, slave, latency, offset)
def launch_timesync_master_handlers(self, master, slave, latency, offset):
for h in self.timesync_master_handlers:
h(master, slave, latency, offset)
def add_timesync_slave_handler(self, handler):
self.timesync_slave_handlers.append(handler)
def add_timesync_master_handler(self, handler):
self.timesync_master_handlers.append(handler)
def send_master_timesync(self):
#if len(self.timesync_master_handlers)==0:
# print('Warning: Sending a master timesync without a registered result callback.')
self.timesync_master.send_master_timesync()
def initialize(self):
with self.initialize_lock:
if self.terminated:
raise ComponentError('Attempted to reinitialize component '+component_name+' after termination')
if (not self.initialized):
self.timesync_slave = TimesyncSlave(component_name=self.component_name, timing_handler=self.launch_timesync_slave_handlers)
self.timesync_master = TimesyncMaster(component_name=self.component_name, timing_handler=self.launch_timesync_master_handlers)
self.in_buffer.register_handler(self._handle_iu_event, ipaaca.IUEventType.MESSAGE, ComponentNotifier.NOTIFY_CATEGORY)
self._submit_notify(True)
self.initialized = True
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import ipaaca
import time
class TimesyncMaster(object):
def __init__(self, component_name=None, timing_handler=None, debug_offset=0):
self.ob = ipaaca.OutputBuffer(('' if component_name is None else component_name)+'TimesyncMaster')
self.ib = ipaaca.InputBuffer(('' if component_name is None else component_name)+'TimesyncMaster', ['timesyncReply'])
# component name to report (None => use buffer name)
self.component_name = component_name if component_name is not None else self.ob.unique_name
#
self.ob.register_handler(self.handle_timesync_master)
self.ib.register_handler(self.handle_timesync_master)
# master_t1 is identical for all slaves
self.master_t1 = None
self.slave_t1s = {}
self.master_t2s = {}
self.slave_t2s = {}
self.latencies = {}
self.time_offsets = {}
#
self.debug_offset = debug_offset
#
self.timing_handler = timing_handler
def set_timing_handler(self, timing_handler):
self.timing_handler = timing_handler
def send_master_timesync(self):
iu = ipaaca.IU('timesyncRequest')
self.master_t1 = self.get_time()
iu.payload = {
'stage':'0',
'master_t1':str(self.master_t1),
'master':self.component_name,
}
self.ob.add(iu)
print('Sent a stage 0 timesync as master '+self.component_name)
def handle_timesync_master(self, iu, event_type, own):
master = iu.payload['master']
if not own and master == self.component_name:
if self.component_name == master:
# reply to our own initial IU
slave = iu.payload['slave']
stage = iu.payload['stage']
if stage=='1':
print('Received stage 1 from slave '+slave)
# initial reply by slave
t1 = iu.payload['slave_t1']
self.slave_t1s[slave] = float(t1)
t2 = self.master_t2s[slave] = self.get_time()
iu.payload.merge({'master_t2': str(t2), 'stage':'2'})
latency1 = t2 - self.master_t1
self.latencies[slave] = latency1
#print('Latency of round-trip 1: %.3f' % latency1)
elif stage=='3':
print('Received stage 3 from slave '+slave)
# second reply by slave
t2 = iu.payload['slave_t2']
self.slave_t2s[slave] = float(t2)
t_final = self.get_time()
latency1 = self.latencies[slave]
latency2 = t_final - self.master_t2s[slave]
latency = self.latencies[slave] = (latency1+latency2)/2.0
offset1 = (self.slave_t1s[slave]-self.master_t1)-latency/2.0
offset2 = (self.slave_t2s[slave]-self.master_t2s[slave])-latency/2.0
offset = (offset1+offset2)/2.0
iu.payload.merge({'stage':'4', 'latency': str(latency), 'offset':str(offset)})
if self.timing_handler is None:
print('Determined timing of timesync slave '+slave)
print(' Avg round-trip latency: %.3f s'%latency)
print(' Offset of their clock: %.3f s'%offset)
else:
self.timing_handler(self.component_name, slave, latency, offset)
else:
# other stages are handled by time slave handler
pass
def get_time(self):
return time.time() + self.debug_offset
class TimesyncSlave(object):
def __init__(self, component_name=None, timing_handler=None, debug_offset=0):
self.ob = ipaaca.OutputBuffer(('' if component_name is None else component_name)+'TimesyncSlave')
self.ib = ipaaca.InputBuffer(('' if component_name is None else component_name)+'TimesyncSlave', ['timesyncRequest'])
# component name to report (None => use buffer name)
self.component_name = component_name if component_name is not None else self.ib.unique_name
self.ob.register_handler(self.handle_timesync_slave)
self.ib.register_handler(self.handle_timesync_slave)
#self.master_t1 = None
#self.master_t2 = None
#self.master = None
self.latency = None
self.my_iu = None
#
self.debug_offset = debug_offset
#
self.timing_handler = timing_handler
def set_timing_handler(self, timing_handler):
self.timing_handler = timing_handler
def handle_timesync_slave(self, iu, event_type, own):
master = iu.payload['master']
stage = iu.payload['stage']
if self.component_name != master:
if not own:
# reply only to IUs from others
if stage=='0':
#print('Received stage 0 from master '+master)
# initial reply to master
self.my_iu = ipaaca.IU('timesyncReply')
# TODO: add grounded_in link too?
t1 = self.get_time()
self.my_iu.payload = iu.payload
self.my_iu.payload['slave'] = self.component_name
self.my_iu.payload['slave_t1'] = str(t1)
self.my_iu.payload['stage'] = '1'
#self.my_iu.payload.merge({
# 'slave':self.component_name,
# 'slave_t1':str(t1),
# 'stage':'1',
# })
self.ob.add(self.my_iu)
else:
if stage=='2':
#print('Received stage 2 from master '+master)
t2 = self.get_time()
self.my_iu.payload.merge({
'slave_t2':str(t2),
'stage':'3',
})
elif stage=='4':
latency = float(iu.payload['latency'])
offset = float(iu.payload['offset'])
if self.timing_handler is None:
print('Timesync master '+master+' determined our timing: ')
print(' Avg round-trip latency: %.3f s'%latency)
print(' Offset of our clock: %.3f s'%offset)
else:
self.timing_handler(master, self.component_name, latency, offset)
def get_time(self):
return time.time() + self.debug_offset
......@@ -33,23 +33,70 @@
import logging
import sys
import time
import ipaaca
color = False
max_size = 2048
def highlight_if_color(s, c='1'):
return s if not color else '['+c+'m'+s+''
def pretty_printed_iu_payload(iu):
s='{ '
for k,v in iu.payload.items():
v2 = (('\''+v+'\'') if len(v)<=max_size else ('\''+v[:max_size]+'\'<excess length omitted>')).replace('\\','\\\\').replace('\n',highlight_if_color('\\n'))
s += '\n' + '\t\t\'' + highlight_if_color(unicode(k),'1')+'\': '+unicode(v2)+', '
s+=' }'
return s
def event_type_color(typ):
colors={'ADDED':'32;1', 'RETRACTED':'31;1', 'UPDATED':'33;1', 'MESSAGE':'34;1'}
return '1' if typ not in colors else colors[typ]
def pretty_printed_iu_event(iu, event_type, local):
s=''
t=time.time()
lt=time.localtime(t)
s += highlight_if_color('%.3f'%t, '1')+' '+"%02d:%02d:%02d"%(lt.tm_hour, lt.tm_min, lt.tm_sec)
s += ' '+highlight_if_color('%-9s'%event_type,event_type_color(event_type))+' category='+highlight_if_color(iu.category,event_type_color(event_type))+' uid='+iu.uid+' owner='+iu.owner_name+' payload='+pretty_printed_iu_payload(iu)
return s
def my_update_handler(iu, event_type, local):
t=time.localtime()
print "%02d:%02d:%02d"%(t.tm_hour, t.tm_min,t.tm_sec),
print time.time(),
print(event_type+': '+unicode(iu))
print pretty_printed_iu_event(iu, event_type, local)
cats = []
if len(sys.argv)>1:
cats = sys.argv[1:]
keep_going=True
idx = 1
while keep_going:
opt = sys.argv[idx] if idx<len(sys.argv) else None
if opt=='--help':
print('IU sniffer - usage:')
print(' '+sys.argv[0]+' [--options] [<category1> [<category2 ...]]')
print(' Listen to specified categories (default: all)')
print(' Option --color : colorize output')
print(' Option --size-limit <size> : limit payload display, chars (def: 2048)')
sys.exit(0)
elif opt=='--color':
color = True
idx += 1
elif opt=='--size-limit':
if len(sys.argv)<idx+2:
print('Please specify a max size')
sys.exit(1)
max_size = int(sys.argv[idx+1])
idx += 2
else:
cats = sys.argv[idx:]
keep_going = False
ib = ipaaca.InputBuffer('SnifferIn', [''] if len(cats)==0 else cats)
ib.register_handler(my_update_handler)
print("Listening for IU events of "+("any category..." if len(cats)==0 else "categories: "+' '.join(cats)))
print('')
print('Ipaaca IU Sniffer - run with --help to see options')
print('Listening for IU events of '+('any category...' if len(cats)==0 else 'categories: '+' '.join(cats)))
print('')
while True:
time.sleep(1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment