Skip to content
Snippets Groups Projects
Commit 50a61931 authored by Hendrik Buschmeier's avatar Hendrik Buschmeier
Browse files

ipaaca-python: Added thread-safe batch-updates of IU payload.

parent 494bd47a
No related branches found
No related tags found
No related merge requests found
......@@ -44,7 +44,7 @@ import rsb.converter
import ipaaca_pb2
OMIT_REVISION_CHECKS = False
_DEFAULT_PAYLOAD_UPDATE_TIMEOUT = 0.1
# IDEAS
# We should think about relaying the update event (or at least the
......@@ -157,10 +157,16 @@ class IUNotFoundError(Exception):
def __init__(self, iu_uid):
super(IUNotFoundError, self).__init__('Lookup of IU ' + str(iu_uid) + ' failed.')
class IUPayloadLockTimeoutError(Exception)
"""Error indicating that exclusive access to the Payload could not be obtained in time."""
def __init__(self, iu):
super(IUPayloadLockTimeoutError, self).__init__('Timeout while accessing payload of IU' + str(iu.uid) + '.')
## --- Generation Architecture -----------------------------------------------
class Payload(dict):
def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False):
def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False, update_timeout=_DEFAULT_PAYLOAD_UPDATE_TIMEOUT):
self.iu = iu
pl1 = {} if new_payload is None else new_payload
pl = {}
......@@ -180,18 +186,26 @@ class Payload(dict):
self._update_on_every_change = True
self._collected_modifications = {}
self._collected_removals = []
self._update_timeout = update_timeout
self._batch_update_lock = threading.RLock()
self._batch_update_cond = threading.Condition(threading.RLock())
def merge(self, payload, writer_name=None):
if not self._batch_update_lock.acquire(False):
raise IUPayloadLockedError(self.iu)
for k, v in payload:
if type(k)==str:
k=unicode(k,'utf8')
if type(v)==str:
v=unicode(v,'utf8')
self.iu._modify_payload(is_delta=True, new_items=payload, keys_to_remove=[], writer_name=writer_name)
return dict.update(payload) # batch update
r = dict.update(payload) # batch update
self._batch_update_lock.release()
return r
def __setitem__(self, k, v, writer_name=None):
if not self._batch_update_lock.acquire(False):
raise IUPayloadLockedError(self.iu)
if type(k)==str:
k=unicode(k,'utf8')
if type(v)==str:
......@@ -200,26 +214,34 @@ class Payload(dict):
self.iu._modify_payload(is_delta=True, new_items={k:v}, keys_to_remove=[], writer_name=writer_name)
else: # Collect additions/modifications
self._collected_modifications[k] = v
return dict.__setitem__(self, k, v)
r = dict.__setitem__(self, k, v)
self._batch_update_lock.release()
return r
def __delitem__(self, k, writer_name=None):
if not self._batch_update_lock.acquire(False):
raise IUPayloadLockedError(self.iu)
if type(k)==str:
k=unicode(k,'utf8')
if self._update_on_every_change:
self.iu._modify_payload(is_delta=True, new_items={}, keys_to_remove=[k], writer_name=writer_name)
else: # Collect additions/modifications
self._collected_removals.append(k)
return dict.__delitem__(self, k)
r = dict.__delitem__(self, k)
self._batch_update_lock.release()
return r
# Context-manager based batch updates, not yet thread-safe (on remote updates)
#def __enter__(self):
# self._update_on_every_change = False
#
#def __exit__(self, type, value, traceback):
# self.iu._modify_payload(is_delta=True, new_items=self._collected_modifications, keys_to_remove=self._collected_removals, writer_name=None)
# self._collected_modifications = {}
# self._collected_removals = []
# self._update_on_every_change = True
def __enter__(self):
self._wait_batch_update_lock(self._update_timeout)
self._update_on_every_change = False
def __exit__(self, type, value, traceback):
self.iu._modify_payload(is_delta=True, new_items=self._collected_modifications, keys_to_remove=self._collected_removals, writer_name=None)
self._collected_modifications = {}
self._collected_removals = []
self._update_on_every_change = True
self.batch_update_lock.release()
def _remotely_enforced_setitem(self, k, v):
"""Sets an item when requested remotely."""
......@@ -229,6 +251,18 @@ class Payload(dict):
"""Deletes an item when requested remotely."""
return dict.__delitem__(self, k)
def _wait_batch_update_lock(self, timeout):
# wait lock with time-out http://stackoverflow.com/a/8393033
with self._batch_update_cond:
current_time = start_time = time.time()
while current_time < start_time + timeout:
if self._batch_update_lock.acquire(False):
return True
else:
self._batch_update_cond.wait(timeout - current_time + start_time)
current_time = time.time()
raise IUPayloadLockTimeoutError(self.iu)
class IUInterface(object): #{{{
......@@ -1329,7 +1363,7 @@ class OutputBuffer(Buffer):
return 0
iu = self._iu_store[update.uid]
with iu.revision_lock:
if not OMIT_REVISION_CHECKS and (update.revision != 0) and (update.revision != iu.revision):
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
return 0
......@@ -1347,15 +1381,17 @@ class OutputBuffer(Buffer):
return 0
iu = self._iu_store[update.uid]
with iu.revision_lock:
if not OMIT_REVISION_CHECKS and (update.revision != 0) and (update.revision != iu.revision):
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
return 0
if update.is_delta:
for k in update.keys_to_remove:
iu.payload.__delitem__(k, writer_name=update.writer_name)
for k,v in update.new_items.items():
iu.payload.__setitem__(k, v, writer_name=update.writer_name)
with iu.payload:
for k in update.keys_to_remove:
iu.payload.__delitem__(k, writer_name=update.writer_name)
for k,v in update.new_items.items():
iu.payload.__setitem__(k, v, writer_name=update.writer_name)
else:
iu._set_payload(update.new_items, writer_name=update.writer_name)
# _set_payload etc. have also incremented the revision number
......@@ -1369,7 +1405,7 @@ class OutputBuffer(Buffer):
return 0
iu = self._iu_store[iu_commission.uid]
with iu.revision_lock:
if not OMIT_REVISION_CHECKS and (iu_commission.revision != 0) and (iu_commission.revision != iu.revision):
if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
return 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment