diff --git a/ipaacalib/python/src/ipaaca/__init__.py b/ipaacalib/python/src/ipaaca/__init__.py index fe1cc3ac7ff6e49a3479fd9479b6573176b69acb..41b42b87106c1feecef5a3fa164ea1f48ef3d9f8 100755 --- a/ipaacalib/python/src/ipaaca/__init__.py +++ b/ipaacalib/python/src/ipaaca/__init__.py @@ -50,11 +50,9 @@ except ImportError: import rsb import rsb.converter -from payload import * import ipaaca_pb2 - -_DEFAULT_PAYLOAD_UPDATE_TIMEOUT = 0.1 +from payload import * # IDEAS # We should think about relaying the update event (or at least the diff --git a/ipaacalib/python/src/ipaaca/payload.py b/ipaacalib/python/src/ipaaca/payload.py index b374d424ec55ee8b638d7d7ce58dfcf7d99390e2..24b1297dcfb5d1b8c1324347f5879a9c292a693a 100644 --- a/ipaacalib/python/src/ipaaca/payload.py +++ b/ipaacalib/python/src/ipaaca/payload.py @@ -30,55 +30,47 @@ # Forschungsgemeinschaft (DFG) in the context of the German # Excellence Initiative. +from __future__ import division, print_function + +import threading + + __all__ = [ - 'Payload' - 'PayloadDictItemProxy', - 'PayloadItemListProxy' + 'Payload', + 'PayloadItemDictProxy', + 'PayloadItemListProxy', ] + +_DEFAULT_PAYLOAD_UPDATE_TIMEOUT = 0.1 + + class Payload(dict): + def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False, update_timeout=_DEFAULT_PAYLOAD_UPDATE_TIMEOUT): self.iu = iu - pl1 = {} if new_payload is None else new_payload - pl = {} - for k,v in pl1.items(): - if type(k)==str: - k=unicode(k,'utf8') - if type(v)==str: - v=unicode(v,'utf8') - pl[k] = v + _pl = {} + for k, v in ({} if new_payload is None else new_payload).iteritems(): + _pl[unicode(k, 'utf8') if type(k) == str else k] = unicode(v, 'utf8') if type(v) == str else v # NOTE omit_init_update_message is necessary to prevent checking for # exceptions and sending updates in the case where we just receive # a whole new payload from the remote side and overwrite it locally. - for k, v in pl.items(): + for k, v in _pl.iteritems(): dict.__setitem__(self, k, v) if (not omit_init_update_message) and (self.iu.buffer is not None): - self.iu._modify_payload(is_delta=False, new_items=pl, keys_to_remove=[], writer_name=writer_name) + self.iu._modify_payload( + is_delta=False, + new_items=_pl, + keys_to_remove=[], + writer_name=writer_name) self._update_on_every_change = True + self._update_timeout = update_timeout self._collected_modifications = {} self._collected_removals = [] - self._update_timeout = update_timeout self._batch_update_writer_name = None # name of remote buffer or None self._batch_update_lock = threading.RLock() self._batch_update_cond = threading.Condition(threading.RLock()) - def merge(self, payload, writer_name=None): - self._batch_update_lock.acquire(True) - #if not self._batch_update_lock.acquire(False): - # print('Someone failed a lock trying to merge '+str(payload.keys())) - # raise IUPayloadLockedError(self.iu) - #print("Payload.merge() IN, Merging "+str(payload.keys())) - for k, v in payload.items(): - if type(k)==str: - k=unicode(k,'utf8') - if type(v)==str: - v=unicode(v,'utf8') - self.iu._modify_payload(is_delta=True, new_items=payload, keys_to_remove=[], writer_name=writer_name) - r = dict.update(self, payload) # batch update - #print("Payload.merge() OUT") - self._batch_update_lock.release() - return r - def __getitem__(self, k): value = dict.__getitem__(self, k) if isinstance(value, dict): @@ -89,60 +81,65 @@ class Payload(dict): return value def __setitem__(self, k, v, writer_name=None): - self._batch_update_lock.acquire(True) - #if not self._batch_update_lock.acquire(False): - # print('Someone failed a lock trying to set '+k+' to '+v) - # raise IUPayloadLockedError(self.iu) - #print("Payload.__setitem__() IN, Setting "+k+' to '+v) - #print(" by writer "+str(writer_name)) - if type(k)==str: - k=unicode(k,'utf8') - if type(v)==str: - v=unicode(v,'utf8') - if self._update_on_every_change: - #print(" running _modify_payload with writer name "+str(writer_name)) - self.iu._modify_payload(is_delta=True, new_items={k:v}, keys_to_remove=[], writer_name=writer_name) - else: # Collect additions/modifications - self._batch_update_writer_name = writer_name - self._collected_modifications[k] = v - r = dict.__setitem__(self, k, v) - #print("Payload.__setitem__() OUT") - self._batch_update_lock.release() - return r + with self._batch_update_lock.acquire: + for k, v in payload.iteritems(): + k = unicode(k, 'utf8') if type(k) == str else k + v = unicode(v, 'utf8') if type(v) == str else v + if self._update_on_every_change: + self.iu._modify_payload( + is_delta=True, + new_items={k:v}, + keys_to_remove=[], + writer_name=writer_name) + else: # Collect additions/modifications + self._batch_update_writer_name = writer_name + self._collected_modifications[k] = v + return dict.__setitem__(self, k, v) def __delitem__(self, k, writer_name=None): - self._batch_update_lock.acquire(True) - #if not self._batch_update_lock.acquire(False): - # print('Someone failed a lock trying to del '+k) - # raise IUPayloadLockedError(self.iu) - #print("Payload.__delitem__() IN, Deleting "+k) - if type(k)==str: - k=unicode(k,'utf8') - if self._update_on_every_change: - self.iu._modify_payload(is_delta=True, new_items={}, keys_to_remove=[k], writer_name=writer_name) - else: # Collect additions/modifications - self._batch_update_writer_name = writer_name - self._collected_removals.append(k) - r = dict.__delitem__(self, k) - #print("Payload.__delitem__() OUT") - self._batch_update_lock.release() - return r - - # Context-manager based batch updates, not yet thread-safe (on remote updates) + with self._batch_update_lock.acquire: + k = unicode(k, 'utf8') if type(k) == str else k + if self._update_on_every_change: + self.iu._modify_payload( + is_delta=True, + new_items={}, + keys_to_remove=[k], + writer_name=writer_name) + else: # Collect additions/modifications + self._batch_update_writer_name = writer_name + self._collected_removals.append(k) + return dict.__delitem__(self, k) + + # Context-manager based batch updates, not thread-safe (on remote updates) def __enter__(self): - #print('running Payload.__enter__()') self._wait_batch_update_lock(self._update_timeout) self._update_on_every_change = False + # Context-manager based batch updates, not thread-safe (on remote updates) def __exit__(self, type, value, traceback): - #print('running Payload.__exit__()') - self.iu._modify_payload(is_delta=True, new_items=self._collected_modifications, keys_to_remove=self._collected_removals, writer_name=self._batch_update_writer_name) + self.iu._modify_payload( + is_delta=True, + new_items=self._collected_modifications, + keys_to_remove=self._collected_removals, + writer_name=self._batch_update_writer_name) self._collected_modifications = {} self._collected_removals = [] self._update_on_every_change = True self._batch_update_writer_name = None self._batch_update_lock.release() + def merge(self, payload, writer_name=None): + with self._batch_update_lock.acquire: + for k, v in payload.iteritems(): + k = unicode(k, 'utf8') if type(k) == str else k + v = unicode(v, 'utf8') if type(v) == str else v + self.iu._modify_payload( + is_delta=True, + new_items=payload, + keys_to_remove=[], + writer_name=writer_name) + return dict.update(self, payload) # batch update + def _remotely_enforced_setitem(self, k, v): """Sets an item when requested remotely.""" return dict.__setitem__(self, k, v) @@ -275,11 +272,11 @@ class PayloadItemListProxy(PayloadItemProxy, list): x = self.content.pop(*args, **kwargs) self._notify_payload() return x - + def sort(self, cmp=None, key=None, reverse=False): self.content.sort(cmp, key, reverse) self._notify_payload() def reverse(self): self.content.reverse() - self._notify_payload() \ No newline at end of file + self._notify_payload()