diff --git a/ipaacalib/python/src/ipaaca/__init__.py b/ipaacalib/python/src/ipaaca/__init__.py index 0944ed92b1a81e94ec9539e928b057cf8dbf5895..67a11f0a0744cc7f63c69d0e768b8ff6fa1b4ef0 100755 --- a/ipaacalib/python/src/ipaaca/__init__.py +++ b/ipaacalib/python/src/ipaaca/__init__.py @@ -44,7 +44,7 @@ import rsb.converter import ipaaca_pb2 -OMIT_REVISION_CHECKS = False +_DEFAULT_PAYLOAD_UPDATE_TIMEOUT = 0.1 # IDEAS # We should think about relaying the update event (or at least the @@ -157,10 +157,16 @@ class IUNotFoundError(Exception): def __init__(self, iu_uid): super(IUNotFoundError, self).__init__('Lookup of IU ' + str(iu_uid) + ' failed.') +class IUPayloadLockTimeoutError(Exception) + """Error indicating that exclusive access to the Payload could not be obtained in time.""" + def __init__(self, iu): + super(IUPayloadLockTimeoutError, self).__init__('Timeout while accessing payload of IU' + str(iu.uid) + '.') + + ## --- Generation Architecture ----------------------------------------------- class Payload(dict): - def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False): + def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False, update_timeout=_DEFAULT_PAYLOAD_UPDATE_TIMEOUT): self.iu = iu pl1 = {} if new_payload is None else new_payload pl = {} @@ -180,18 +186,26 @@ class Payload(dict): self._update_on_every_change = True self._collected_modifications = {} self._collected_removals = [] - + self._update_timeout = update_timeout + self._batch_update_lock = threading.RLock() + self._batch_update_cond = threading.Condition(threading.RLock()) def merge(self, payload, writer_name=None): + if not self._batch_update_lock.acquire(False): + raise IUPayloadLockedError(self.iu) for k, v in payload: if type(k)==str: k=unicode(k,'utf8') if type(v)==str: v=unicode(v,'utf8') self.iu._modify_payload(is_delta=True, new_items=payload, keys_to_remove=[], writer_name=writer_name) - return dict.update(payload) # batch update + r = dict.update(payload) # batch update + self._batch_update_lock.release() + return r def __setitem__(self, k, v, writer_name=None): + if not self._batch_update_lock.acquire(False): + raise IUPayloadLockedError(self.iu) if type(k)==str: k=unicode(k,'utf8') if type(v)==str: @@ -200,26 +214,34 @@ class Payload(dict): self.iu._modify_payload(is_delta=True, new_items={k:v}, keys_to_remove=[], writer_name=writer_name) else: # Collect additions/modifications self._collected_modifications[k] = v - return dict.__setitem__(self, k, v) + r = dict.__setitem__(self, k, v) + self._batch_update_lock.release() + return r def __delitem__(self, k, writer_name=None): + if not self._batch_update_lock.acquire(False): + raise IUPayloadLockedError(self.iu) if type(k)==str: k=unicode(k,'utf8') if self._update_on_every_change: self.iu._modify_payload(is_delta=True, new_items={}, keys_to_remove=[k], writer_name=writer_name) else: # Collect additions/modifications self._collected_removals.append(k) - return dict.__delitem__(self, k) + r = dict.__delitem__(self, k) + self._batch_update_lock.release() + return r # Context-manager based batch updates, not yet thread-safe (on remote updates) - #def __enter__(self): - # self._update_on_every_change = False - # - #def __exit__(self, type, value, traceback): - # self.iu._modify_payload(is_delta=True, new_items=self._collected_modifications, keys_to_remove=self._collected_removals, writer_name=None) - # self._collected_modifications = {} - # self._collected_removals = [] - # self._update_on_every_change = True + def __enter__(self): + self._wait_batch_update_lock(self._update_timeout) + self._update_on_every_change = False + + def __exit__(self, type, value, traceback): + self.iu._modify_payload(is_delta=True, new_items=self._collected_modifications, keys_to_remove=self._collected_removals, writer_name=None) + self._collected_modifications = {} + self._collected_removals = [] + self._update_on_every_change = True + self.batch_update_lock.release() def _remotely_enforced_setitem(self, k, v): """Sets an item when requested remotely.""" @@ -229,6 +251,18 @@ class Payload(dict): """Deletes an item when requested remotely.""" return dict.__delitem__(self, k) + def _wait_batch_update_lock(self, timeout): + # wait lock with time-out http://stackoverflow.com/a/8393033 + with self._batch_update_cond: + current_time = start_time = time.time() + while current_time < start_time + timeout: + if self._batch_update_lock.acquire(False): + return True + else: + self._batch_update_cond.wait(timeout - current_time + start_time) + current_time = time.time() + raise IUPayloadLockTimeoutError(self.iu) + class IUInterface(object): #{{{ @@ -1329,7 +1363,7 @@ class OutputBuffer(Buffer): return 0 iu = self._iu_store[update.uid] with iu.revision_lock: - if not OMIT_REVISION_CHECKS and (update.revision != 0) and (update.revision != iu.revision): + if (update.revision != 0) and (update.revision != iu.revision): # (0 means "do not pay attention to the revision number" -> "force update") logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid)) return 0 @@ -1347,15 +1381,17 @@ class OutputBuffer(Buffer): return 0 iu = self._iu_store[update.uid] with iu.revision_lock: - if not OMIT_REVISION_CHECKS and (update.revision != 0) and (update.revision != iu.revision): + if (update.revision != 0) and (update.revision != iu.revision): # (0 means "do not pay attention to the revision number" -> "force update") logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid)) return 0 if update.is_delta: - for k in update.keys_to_remove: - iu.payload.__delitem__(k, writer_name=update.writer_name) - for k,v in update.new_items.items(): - iu.payload.__setitem__(k, v, writer_name=update.writer_name) + + with iu.payload: + for k in update.keys_to_remove: + iu.payload.__delitem__(k, writer_name=update.writer_name) + for k,v in update.new_items.items(): + iu.payload.__setitem__(k, v, writer_name=update.writer_name) else: iu._set_payload(update.new_items, writer_name=update.writer_name) # _set_payload etc. have also incremented the revision number @@ -1369,7 +1405,7 @@ class OutputBuffer(Buffer): return 0 iu = self._iu_store[iu_commission.uid] with iu.revision_lock: - if not OMIT_REVISION_CHECKS and (iu_commission.revision != 0) and (iu_commission.revision != iu.revision): + if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision): # (0 means "do not pay attention to the revision number" -> "force update") logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid)) return 0