From 8cd1eff5b6ce62bf1bb9d30d311e45f3197ddd8d Mon Sep 17 00:00:00 2001 From: Ramin Yaghoubzadeh <ryaghoubzadeh@uni-bielefeld.de> Date: Mon, 6 Jan 2014 15:34:01 +0100 Subject: [PATCH] python: bugfix: Payload enter/exit always sent owner name instead of writer name, fixed. --- ipaacalib/python/src/ipaaca/__init__.py | 51 +++++-- ipaacalib/python/src/ipaaca/util/notifier.py | 137 +++++++++++-------- 2 files changed, 122 insertions(+), 66 deletions(-) diff --git a/ipaacalib/python/src/ipaaca/__init__.py b/ipaacalib/python/src/ipaaca/__init__.py index 1aeff84..41c3748 100755 --- a/ipaacalib/python/src/ipaaca/__init__.py +++ b/ipaacalib/python/src/ipaaca/__init__.py @@ -193,60 +193,80 @@ class Payload(dict): self._collected_modifications = {} self._collected_removals = [] self._update_timeout = update_timeout + self._batch_update_writer_name = None # name of remote buffer or None self._batch_update_lock = threading.RLock() self._batch_update_cond = threading.Condition(threading.RLock()) def merge(self, payload, writer_name=None): - if not self._batch_update_lock.acquire(False): - raise IUPayloadLockedError(self.iu) - for k, v in payload: + self._batch_update_lock.acquire(True) + #if not self._batch_update_lock.acquire(False): + # print('Someone failed a lock trying to merge '+str(payload.keys())) + # raise IUPayloadLockedError(self.iu) + #print("Payload.merge() IN, Merging "+str(payload.keys())) + for k, v in payload.items(): if type(k)==str: k=unicode(k,'utf8') if type(v)==str: v=unicode(v,'utf8') self.iu._modify_payload(is_delta=True, new_items=payload, keys_to_remove=[], writer_name=writer_name) r = dict.update(payload) # batch update + #print("Payload.merge() OUT") self._batch_update_lock.release() return r def __setitem__(self, k, v, writer_name=None): - if not self._batch_update_lock.acquire(False): - raise IUPayloadLockedError(self.iu) + self._batch_update_lock.acquire(True) + #if not self._batch_update_lock.acquire(False): + # print('Someone failed a lock trying to set '+k+' to '+v) + # raise IUPayloadLockedError(self.iu) + #print("Payload.__setitem__() IN, Setting "+k+' to '+v) + #print(" by writer "+str(writer_name)) if type(k)==str: k=unicode(k,'utf8') if type(v)==str: v=unicode(v,'utf8') if self._update_on_every_change: + #print(" running _modify_payload with writer name "+str(writer_name)) self.iu._modify_payload(is_delta=True, new_items={k:v}, keys_to_remove=[], writer_name=writer_name) else: # Collect additions/modifications + self._batch_update_writer_name = writer_name self._collected_modifications[k] = v r = dict.__setitem__(self, k, v) + #print("Payload.__setitem__() OUT") self._batch_update_lock.release() return r def __delitem__(self, k, writer_name=None): - if not self._batch_update_lock.acquire(False): - raise IUPayloadLockedError(self.iu) + self._batch_update_lock.acquire(True) + #if not self._batch_update_lock.acquire(False): + # print('Someone failed a lock trying to del '+k) + # raise IUPayloadLockedError(self.iu) + #print("Payload.__delitem__() IN, Deleting "+k) if type(k)==str: k=unicode(k,'utf8') if self._update_on_every_change: self.iu._modify_payload(is_delta=True, new_items={}, keys_to_remove=[k], writer_name=writer_name) else: # Collect additions/modifications + self._batch_update_writer_name = writer_name self._collected_removals.append(k) r = dict.__delitem__(self, k) + #print("Payload.__delitem__() OUT") self._batch_update_lock.release() return r # Context-manager based batch updates, not yet thread-safe (on remote updates) def __enter__(self): + #print('running Payload.__enter__()') self._wait_batch_update_lock(self._update_timeout) self._update_on_every_change = False def __exit__(self, type, value, traceback): - self.iu._modify_payload(is_delta=True, new_items=self._collected_modifications, keys_to_remove=self._collected_removals, writer_name=None) + #print('running Payload.__exit__()') + self.iu._modify_payload(is_delta=True, new_items=self._collected_modifications, keys_to_remove=self._collected_removals, writer_name=self._batch_update_writer_name) self._collected_modifications = {} self._collected_removals = [] self._update_on_every_change = True + self._batch_update_writer_name = None self._batch_update_lock.release() def _remotely_enforced_setitem(self, k, v): @@ -450,6 +470,7 @@ class IU(IUInterface):#{{{ # FIXME: Is it actually set locally? self._increase_revision_number() if self.is_published: + #print(' _modify_payload: running send_iu_pl_upd with writer name '+str(writer_name)) # send update to remote holders self.buffer._send_iu_payload_update( self, @@ -1296,6 +1317,9 @@ class InputBuffer(Buffer): # Notify only for remotely triggered events; # Discard updates that originate from this buffer return + #else: + # print('Got update written by buffer '+str(event.data.writer_name)) + if type_ is ipaaca_pb2.IUCommission: # IU commit iu = self._iu_store[event.data.uid] @@ -1389,16 +1413,20 @@ class OutputBuffer(Buffer): with iu.revision_lock: if (update.revision != 0) and (update.revision != iu.revision): # (0 means "do not pay attention to the revision number" -> "force update") - logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid)) + logger.warning("Remote update_payload operation failed because request was out of date; IU "+str(update.uid)) + logger.warning(" Writer was: "+update.writer_name) + logger.warning(" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove)) + logger.warning(" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision)) return 0 if update.is_delta: - + #print('Writing delta update by '+str(update.writer_name)) with iu.payload: for k in update.keys_to_remove: iu.payload.__delitem__(k, writer_name=update.writer_name) for k,v in update.new_items.items(): iu.payload.__setitem__(k, v, writer_name=update.writer_name) else: + #print('Writing non-incr update by '+str(update.writer_name)) iu._set_payload(update.new_items, writer_name=update.writer_name) # _set_payload etc. have also incremented the revision number self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category) @@ -1441,7 +1469,7 @@ class OutputBuffer(Buffer): #if iu._uid is not None: # raise IUPublishedError(iu) #iu.uid = self._generate_iu_uid() - if iu.uid in self._iu_store: + if iu.uid in self._iu_store: raise IUPublishedError(iu) if iu.buffer is not None: raise IUPublishedError(iu) @@ -1548,6 +1576,7 @@ class OutputBuffer(Buffer): payload_update.writer_name = writer_name informer = self._get_informer(iu._category) informer.publishData(payload_update) + #print(" -- Sent update with writer name "+str(writer_name)) ## --- RSB ------------------------------------------------------------------- diff --git a/ipaacalib/python/src/ipaaca/util/notifier.py b/ipaacalib/python/src/ipaaca/util/notifier.py index d864cf0..438ad76 100644 --- a/ipaacalib/python/src/ipaaca/util/notifier.py +++ b/ipaacalib/python/src/ipaaca/util/notifier.py @@ -37,64 +37,91 @@ import threading import ipaaca NotificationState = ipaaca.enum( - NEW = 'new', - OLD = 'old', - DOWN = 'down' -) + NEW = 'new', + OLD = 'old', + DOWN = 'down' + ) + +class ComponentError(Exception): + def __init__(self, msg): + super(ComponentError, self).__init__(msg) class ComponentNotifier(object): - NOTIFY_CATEGORY = "componentNotify" - SEND_CATEGORIES = "send_categories" - RECEIVE_CATEGORIES = "recv_categories" - STATE = "state" - NAME = "name" - FUNCTION = "function" - - def __init__(self, componentName, componentFunction, sendCategories, receiveCategories, outBuffer=None, inBuffer=None): - self.componentName = componentName - self.componentFunction = componentFunction - self.sendCategories = frozenset(sendCategories) - self.receiveCategories = frozenset(receiveCategories) - self.inBuffer = inBuffer if inBuffer is not None else ipaaca.InputBuffer(componentName + 'Notifier') - self.outBuffer = outBuffer if outBuffer is not None else ipaaca.OutputBuffer(componentName + 'Notifier') - self.initialized = False - self.notificationHandlers = [] - self.initializeLock = threading.Lock() - self.notificationHandlerLock = threading.Lock() - self.submitLock = threading.Lock() - - def _submit_notify(self, isNew): - with self.submitLock: - notifyIU = ipaaca.Message(ComponentNotifier.NOTIFY_CATEGORY) - notifyIU.payload = { - ComponentNotifier.NAME: self.componentName, - ComponentNotifier.FUNCTION: self.componentFunction, - ComponentNotifier.SEND_CATEGORIES: ",".join(self.sendCategories), - ComponentNotifier.RECEIVE_CATEGORIES: ",".join(self.receiveCategories), - ComponentNotifier.STATE: NotificationState.NEW if isNew else NotificationState.OLD, - } - self.outBuffer.add(notifyIU) - - def _handle_iu_event(self, iu, event_type, local): - if iu.payload[ComponentNotifier.NAME] == self.componentName: - return - with self.notificationHandlerLock: - for h in self.notificationHandlers: - h(iu, event_type, local) - if iu.payload[ComponentNotifier.STATE] == "new": - #print("submitting") - self._submit_notify(False) + NOTIFY_CATEGORY = "componentNotify" + SEND_CATEGORIES = "send_categories" + RECEIVE_CATEGORIES = "recv_categories" + STATE = "state" + NAME = "name" + FUNCTION = "function" + + def __init__(self, component_name, component_function, send_categories, receive_categories, out_buffer=None, in_buffer=None): + self.component_name = component_name + self.component_function = component_function + self.send_categories = frozenset(send_categories) + self.receive_categories = frozenset(receive_categories) + self.in_buffer = in_buffer if in_buffer is not None else ipaaca.InputBuffer(component_name + 'Notifier') + self.out_buffer = out_buffer if out_buffer is not None else ipaaca.OutputBuffer(component_name + 'Notifier') + self.terminated = False + self.initialized = False + self.notification_handlers = [] + self.initialize_lock = threading.Lock() + self.notification_handler_lock = threading.Lock() + self.submit_lock = threading.Lock() + + def _submit_notify(self, is_new): + with self.submit_lock: + notify_iu = ipaaca.Message(ComponentNotifier.NOTIFY_CATEGORY) + notify_iu.payload = { + ComponentNotifier.NAME: self.component_name, + ComponentNotifier.FUNCTION: self.component_function, + ComponentNotifier.SEND_CATEGORIES: ",".join(self.send_categories), + ComponentNotifier.RECEIVE_CATEGORIES: ",".join(self.receive_categories), + ComponentNotifier.STATE: NotificationState.NEW if is_new else NotificationState.OLD, + } + self.out_buffer.add(notify_iu) + + def terminate(self): + with self.submit_lock: + if self.terminated: return + self.terminated = True + notify_iu = ipaaca.Message(ComponentNotifier.NOTIFY_CATEGORY) + notify_iu.payload = { + ComponentNotifier.NAME: self.component_name, + ComponentNotifier.FUNCTION: self.component_function, + ComponentNotifier.SEND_CATEGORIES: ",".join(self.send_categories), + ComponentNotifier.RECEIVE_CATEGORIES: ",".join(self.receive_categories), + ComponentNotifier.STATE: NotificationState.DOWN, + } + self.out_buffer.add(notify_iu) + + def _handle_iu_event(self, iu, event_type, local): + if iu.payload[ComponentNotifier.NAME] == self.component_name: + return + with self.notification_handler_lock: + for h in self.notification_handlers: + h(iu, event_type, local) + if iu.payload[ComponentNotifier.STATE] == "new": + #print("submitting") + self._submit_notify(False) + + def add_notification_handler(self, handler): + with self.notification_handler_lock: + self.notification_handlers.append(handler) - def add_notification_handler(self, handler): - with self.notificationHandlerLock: - self.notificationHandlers.append(handler) - - def initialize(self): - with self.initializeLock: - if (not self.initialized): - self.inBuffer.register_handler(self._handle_iu_event, ipaaca.IUEventType.MESSAGE, ComponentNotifier.NOTIFY_CATEGORY) - self._submit_notify(isNew=True) - self.initialized = True + def initialize(self): + with self.initialize_lock: + if self.terminated: + raise ComponentError('Attempted to reinitialize component '+component_name+' after termination') + if (not self.initialized): + self.in_buffer.register_handler(self._handle_iu_event, ipaaca.IUEventType.MESSAGE, ComponentNotifier.NOTIFY_CATEGORY) + self._submit_notify(True) + self.initialized = True + + def __enter__(self): + self.initialize() + + def __exit__(self, t, v, tb): + self.terminate() -- GitLab