diff --git a/ipaacalib/python/src/ipaaca/__init__.py b/ipaacalib/python/src/ipaaca/__init__.py index 8196d2d2203931a7ed3e58468374627d5fe6cb49..fe1cc3ac7ff6e49a3479fd9479b6573176b69acb 100755 --- a/ipaacalib/python/src/ipaaca/__init__.py +++ b/ipaacalib/python/src/ipaaca/__init__.py @@ -44,12 +44,14 @@ try: import simplejson as json except ImportError: import json - print('INFO: Using json instead of simplejson.') - print(' Install simplejson for better performance.') + print('INFO: Using module "json" instead of "simplejson".') + print(' Install "simplejson" for better performance.') import rsb import rsb.converter +from payload import * + import ipaaca_pb2 _DEFAULT_PAYLOAD_UPDATE_TIMEOUT = 0.1 @@ -167,254 +169,9 @@ class IUPayloadLockedError(Exception): super(IUPayloadLockedError, self).__init__('IU '+str(iu.uid)+' was locked during access attempt.') -## --- Generation Architecture ----------------------------------------------- - -class Payload(dict): - def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False, update_timeout=_DEFAULT_PAYLOAD_UPDATE_TIMEOUT): - self.iu = iu - pl1 = {} if new_payload is None else new_payload - pl = {} - for k,v in pl1.items(): - if type(k)==str: - k=unicode(k,'utf8') - if type(v)==str: - v=unicode(v,'utf8') - pl[k] = v - # NOTE omit_init_update_message is necessary to prevent checking for - # exceptions and sending updates in the case where we just receive - # a whole new payload from the remote side and overwrite it locally. - for k, v in pl.items(): - dict.__setitem__(self, k, v) - if (not omit_init_update_message) and (self.iu.buffer is not None): - self.iu._modify_payload(is_delta=False, new_items=pl, keys_to_remove=[], writer_name=writer_name) - self._update_on_every_change = True - self._collected_modifications = {} - self._collected_removals = [] - self._update_timeout = update_timeout - self._batch_update_writer_name = None # name of remote buffer or None - self._batch_update_lock = threading.RLock() - self._batch_update_cond = threading.Condition(threading.RLock()) - - def merge(self, payload, writer_name=None): - self._batch_update_lock.acquire(True) - #if not self._batch_update_lock.acquire(False): - # print('Someone failed a lock trying to merge '+str(payload.keys())) - # raise IUPayloadLockedError(self.iu) - #print("Payload.merge() IN, Merging "+str(payload.keys())) - for k, v in payload.items(): - if type(k)==str: - k=unicode(k,'utf8') - if type(v)==str: - v=unicode(v,'utf8') - self.iu._modify_payload(is_delta=True, new_items=payload, keys_to_remove=[], writer_name=writer_name) - r = dict.update(self, payload) # batch update - #print("Payload.merge() OUT") - self._batch_update_lock.release() - return r - - def __getitem__(self, k): - value = dict.__getitem__(self, k) - if isinstance(value, dict): - return PayloadItemDictProxy(value, self, k) - elif isinstance(value, list): - return PayloadItemListProxy(value, self, k) - else: - return value - - def __setitem__(self, k, v, writer_name=None): - self._batch_update_lock.acquire(True) - #if not self._batch_update_lock.acquire(False): - # print('Someone failed a lock trying to set '+k+' to '+v) - # raise IUPayloadLockedError(self.iu) - #print("Payload.__setitem__() IN, Setting "+k+' to '+v) - #print(" by writer "+str(writer_name)) - if type(k)==str: - k=unicode(k,'utf8') - if type(v)==str: - v=unicode(v,'utf8') - if self._update_on_every_change: - #print(" running _modify_payload with writer name "+str(writer_name)) - self.iu._modify_payload(is_delta=True, new_items={k:v}, keys_to_remove=[], writer_name=writer_name) - else: # Collect additions/modifications - self._batch_update_writer_name = writer_name - self._collected_modifications[k] = v - r = dict.__setitem__(self, k, v) - #print("Payload.__setitem__() OUT") - self._batch_update_lock.release() - return r - - def __delitem__(self, k, writer_name=None): - self._batch_update_lock.acquire(True) - #if not self._batch_update_lock.acquire(False): - # print('Someone failed a lock trying to del '+k) - # raise IUPayloadLockedError(self.iu) - #print("Payload.__delitem__() IN, Deleting "+k) - if type(k)==str: - k=unicode(k,'utf8') - if self._update_on_every_change: - self.iu._modify_payload(is_delta=True, new_items={}, keys_to_remove=[k], writer_name=writer_name) - else: # Collect additions/modifications - self._batch_update_writer_name = writer_name - self._collected_removals.append(k) - r = dict.__delitem__(self, k) - #print("Payload.__delitem__() OUT") - self._batch_update_lock.release() - return r - - # Context-manager based batch updates, not yet thread-safe (on remote updates) - def __enter__(self): - #print('running Payload.__enter__()') - self._wait_batch_update_lock(self._update_timeout) - self._update_on_every_change = False - - def __exit__(self, type, value, traceback): - #print('running Payload.__exit__()') - self.iu._modify_payload(is_delta=True, new_items=self._collected_modifications, keys_to_remove=self._collected_removals, writer_name=self._batch_update_writer_name) - self._collected_modifications = {} - self._collected_removals = [] - self._update_on_every_change = True - self._batch_update_writer_name = None - self._batch_update_lock.release() - - def _remotely_enforced_setitem(self, k, v): - """Sets an item when requested remotely.""" - return dict.__setitem__(self, k, v) - - def _remotely_enforced_delitem(self, k): - """Deletes an item when requested remotely.""" - return dict.__delitem__(self, k) - - def _wait_batch_update_lock(self, timeout): - # wait lock with time-out http://stackoverflow.com/a/8393033 - with self._batch_update_cond: - current_time = start_time = time.time() - while current_time < start_time + timeout: - if self._batch_update_lock.acquire(False): - return True - else: - self._batch_update_cond.wait(timeout - current_time + start_time) - current_time = time.time() - raise IUPayloadLockTimeoutError(self.iu) - -class PayloadItemProxy(object): - - def __init__(self, content, payload, identifier_in_payload): - self.payload = payload - self.content = content - self.identifier_in_payload = identifier_in_payload - - def _notify_payload(self): - self.payload[self.identifier_in_payload] = self.content - - def _create_proxy(self, obj, identifier_in_payload): - if isinstance(obj, dict): - return PayloadItemDictProxy(obj, self.payload, identifier_in_payload) - elif isinstance(obj, list): - return PayloadItemListProxy(obj, self.payload, identifier_in_payload) - else: - return obj - - def __setitem__(self, k, v): - self.content.__setitem__(k,v) - self._notify_payload() - - def __getitem__(self, k): - item = self.content.__getitem__(k) - return self._create_proxy(item, k) - - def __delitem__(self, k): - self.content.__delitem__(k) - self._notify_payload() - - -class PayloadItemDictProxy(PayloadItemProxy, dict): - - def __init__(self, content, payload, identifier_in_payload): - dict.__init__(self, content) - PayloadItemProxy.__init__(self, content, payload, identifier_in_payload) - - def clear(self): - self.content.clear() - self._notify_payload() - - def get(self, key, default=None): - value = self.content.get(key, default) - return self._create_proxy(value, key) - - def items(self): - return [(key, value) for key, value in self.iteritems()] - - def iteritems(self): - for key, value in self.content.iteritems(): - yield key, self._create_proxy(value, key) - - def values(self): - return [value for value in self.itervalues()] - - def itervalues(self): - for key, value in self.content.iteritems(): - yield self._create_proxy(value, key) - - def pop(self, key, *args): - x = self.content.pop(key, *args) - self._notify_payload() - return x - - def popitem(self): - x = self.content.popitem() - self._notify_payload() - return x - - def setdefault(self, key, default=None): - notification_necessary = not key in self.content - x = self.content.setdefault(key, default) - if notification_necessary: - self._notify_payload() - return x - - def update(self, *args, **kwargs): - self.content.update(*args, **kwargs) - self._notify_payload() - - -class PayloadItemListProxy(PayloadItemProxy, list): - - def __init__(self, content, payload, identifier_in_payload): - list.__init__(self, content) - PayloadItemProxy.__init__(self, content, payload, identifier_in_payload) - - def __iter__(self): - for index, item in enumerate(self.content): - yield self._create_proxy(item, index) - - def append(self, x): - self.content.append(x) - self._notify_payload() - - def extend(self, l): - self.content.extend(l) - self._notify_payload() - - def insert(self, i, x): - self.content.insert(i, x) - self._notify_payload() - def remove(self, x): - self.content.remove(x) - self._notify_payload() - def pop(self, *args, **kwargs): - x = self.content.pop(*args, **kwargs) - self._notify_payload() - return x - - def sort(self, cmp=None, key=None, reverse=False): - self.content.sort(cmp, key, reverse) - self._notify_payload() - def reverse(self): - self.content.reverse() - self._notify_payload() class IUInterface(object): #{{{ diff --git a/ipaacalib/python/src/ipaaca/payload.py b/ipaacalib/python/src/ipaaca/payload.py new file mode 100644 index 0000000000000000000000000000000000000000..b374d424ec55ee8b638d7d7ce58dfcf7d99390e2 --- /dev/null +++ b/ipaacalib/python/src/ipaaca/payload.py @@ -0,0 +1,285 @@ +# -*- coding: utf-8 -*- + +# This file is part of IPAACA, the +# "Incremental Processing Architecture +# for Artificial Conversational Agents". +# +# Copyright (c) 2009-2014 Social Cognitive Systems Group +# CITEC, Bielefeld University +# +# http://opensource.cit-ec.de/projects/ipaaca/ +# http://purl.org/net/ipaaca +# +# This file may be licensed under the terms of of the +# GNU Lesser General Public License Version 3 (the ``LGPL''), +# or (at your option) any later version. +# +# Software distributed under the License is distributed +# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either +# express or implied. See the LGPL for the specific language +# governing rights and limitations. +# +# You should have received a copy of the LGPL along with this +# program. If not, go to http://www.gnu.org/licenses/lgpl.html +# or write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# The development of this software was supported by the +# Excellence Cluster EXC 277 Cognitive Interaction Technology. +# The Excellence Cluster EXC 277 is a grant of the Deutsche +# Forschungsgemeinschaft (DFG) in the context of the German +# Excellence Initiative. + +__all__ = [ + 'Payload' + 'PayloadDictItemProxy', + 'PayloadItemListProxy' +] + +class Payload(dict): + def __init__(self, iu, writer_name=None, new_payload=None, omit_init_update_message=False, update_timeout=_DEFAULT_PAYLOAD_UPDATE_TIMEOUT): + self.iu = iu + pl1 = {} if new_payload is None else new_payload + pl = {} + for k,v in pl1.items(): + if type(k)==str: + k=unicode(k,'utf8') + if type(v)==str: + v=unicode(v,'utf8') + pl[k] = v + # NOTE omit_init_update_message is necessary to prevent checking for + # exceptions and sending updates in the case where we just receive + # a whole new payload from the remote side and overwrite it locally. + for k, v in pl.items(): + dict.__setitem__(self, k, v) + if (not omit_init_update_message) and (self.iu.buffer is not None): + self.iu._modify_payload(is_delta=False, new_items=pl, keys_to_remove=[], writer_name=writer_name) + self._update_on_every_change = True + self._collected_modifications = {} + self._collected_removals = [] + self._update_timeout = update_timeout + self._batch_update_writer_name = None # name of remote buffer or None + self._batch_update_lock = threading.RLock() + self._batch_update_cond = threading.Condition(threading.RLock()) + + def merge(self, payload, writer_name=None): + self._batch_update_lock.acquire(True) + #if not self._batch_update_lock.acquire(False): + # print('Someone failed a lock trying to merge '+str(payload.keys())) + # raise IUPayloadLockedError(self.iu) + #print("Payload.merge() IN, Merging "+str(payload.keys())) + for k, v in payload.items(): + if type(k)==str: + k=unicode(k,'utf8') + if type(v)==str: + v=unicode(v,'utf8') + self.iu._modify_payload(is_delta=True, new_items=payload, keys_to_remove=[], writer_name=writer_name) + r = dict.update(self, payload) # batch update + #print("Payload.merge() OUT") + self._batch_update_lock.release() + return r + + def __getitem__(self, k): + value = dict.__getitem__(self, k) + if isinstance(value, dict): + return PayloadItemDictProxy(value, self, k) + elif isinstance(value, list): + return PayloadItemListProxy(value, self, k) + else: + return value + + def __setitem__(self, k, v, writer_name=None): + self._batch_update_lock.acquire(True) + #if not self._batch_update_lock.acquire(False): + # print('Someone failed a lock trying to set '+k+' to '+v) + # raise IUPayloadLockedError(self.iu) + #print("Payload.__setitem__() IN, Setting "+k+' to '+v) + #print(" by writer "+str(writer_name)) + if type(k)==str: + k=unicode(k,'utf8') + if type(v)==str: + v=unicode(v,'utf8') + if self._update_on_every_change: + #print(" running _modify_payload with writer name "+str(writer_name)) + self.iu._modify_payload(is_delta=True, new_items={k:v}, keys_to_remove=[], writer_name=writer_name) + else: # Collect additions/modifications + self._batch_update_writer_name = writer_name + self._collected_modifications[k] = v + r = dict.__setitem__(self, k, v) + #print("Payload.__setitem__() OUT") + self._batch_update_lock.release() + return r + + def __delitem__(self, k, writer_name=None): + self._batch_update_lock.acquire(True) + #if not self._batch_update_lock.acquire(False): + # print('Someone failed a lock trying to del '+k) + # raise IUPayloadLockedError(self.iu) + #print("Payload.__delitem__() IN, Deleting "+k) + if type(k)==str: + k=unicode(k,'utf8') + if self._update_on_every_change: + self.iu._modify_payload(is_delta=True, new_items={}, keys_to_remove=[k], writer_name=writer_name) + else: # Collect additions/modifications + self._batch_update_writer_name = writer_name + self._collected_removals.append(k) + r = dict.__delitem__(self, k) + #print("Payload.__delitem__() OUT") + self._batch_update_lock.release() + return r + + # Context-manager based batch updates, not yet thread-safe (on remote updates) + def __enter__(self): + #print('running Payload.__enter__()') + self._wait_batch_update_lock(self._update_timeout) + self._update_on_every_change = False + + def __exit__(self, type, value, traceback): + #print('running Payload.__exit__()') + self.iu._modify_payload(is_delta=True, new_items=self._collected_modifications, keys_to_remove=self._collected_removals, writer_name=self._batch_update_writer_name) + self._collected_modifications = {} + self._collected_removals = [] + self._update_on_every_change = True + self._batch_update_writer_name = None + self._batch_update_lock.release() + + def _remotely_enforced_setitem(self, k, v): + """Sets an item when requested remotely.""" + return dict.__setitem__(self, k, v) + + def _remotely_enforced_delitem(self, k): + """Deletes an item when requested remotely.""" + return dict.__delitem__(self, k) + + def _wait_batch_update_lock(self, timeout): + # wait lock with time-out http://stackoverflow.com/a/8393033 + with self._batch_update_cond: + current_time = start_time = time.time() + while current_time < start_time + timeout: + if self._batch_update_lock.acquire(False): + return True + else: + self._batch_update_cond.wait(timeout - current_time + start_time) + current_time = time.time() + raise IUPayloadLockTimeoutError(self.iu) + + +class PayloadItemProxy(object): + + def __init__(self, content, payload, identifier_in_payload): + self.payload = payload + self.content = content + self.identifier_in_payload = identifier_in_payload + + def _notify_payload(self): + self.payload[self.identifier_in_payload] = self.content + + def _create_proxy(self, obj, identifier_in_payload): + if isinstance(obj, dict): + return PayloadItemDictProxy(obj, self.payload, identifier_in_payload) + elif isinstance(obj, list): + return PayloadItemListProxy(obj, self.payload, identifier_in_payload) + else: + return obj + + def __setitem__(self, k, v): + self.content.__setitem__(k,v) + self._notify_payload() + + def __getitem__(self, k): + item = self.content.__getitem__(k) + return self._create_proxy(item, k) + + def __delitem__(self, k): + self.content.__delitem__(k) + self._notify_payload() + + +class PayloadItemDictProxy(PayloadItemProxy, dict): + + def __init__(self, content, payload, identifier_in_payload): + dict.__init__(self, content) + PayloadItemProxy.__init__(self, content, payload, identifier_in_payload) + + def clear(self): + self.content.clear() + self._notify_payload() + + def get(self, key, default=None): + value = self.content.get(key, default) + return self._create_proxy(value, key) + + def items(self): + return [(key, value) for key, value in self.iteritems()] + + def iteritems(self): + for key, value in self.content.iteritems(): + yield key, self._create_proxy(value, key) + + def values(self): + return [value for value in self.itervalues()] + + def itervalues(self): + for key, value in self.content.iteritems(): + yield self._create_proxy(value, key) + + def pop(self, key, *args): + x = self.content.pop(key, *args) + self._notify_payload() + return x + + def popitem(self): + x = self.content.popitem() + self._notify_payload() + return x + + def setdefault(self, key, default=None): + notification_necessary = not key in self.content + x = self.content.setdefault(key, default) + if notification_necessary: + self._notify_payload() + return x + + def update(self, *args, **kwargs): + self.content.update(*args, **kwargs) + self._notify_payload() + + +class PayloadItemListProxy(PayloadItemProxy, list): + + def __init__(self, content, payload, identifier_in_payload): + list.__init__(self, content) + PayloadItemProxy.__init__(self, content, payload, identifier_in_payload) + + def __iter__(self): + for index, item in enumerate(self.content): + yield self._create_proxy(item, index) + + def append(self, x): + self.content.append(x) + self._notify_payload() + + def extend(self, l): + self.content.extend(l) + self._notify_payload() + + def insert(self, i, x): + self.content.insert(i, x) + self._notify_payload() + + def remove(self, x): + self.content.remove(x) + self._notify_payload() + + def pop(self, *args, **kwargs): + x = self.content.pop(*args, **kwargs) + self._notify_payload() + return x + + def sort(self, cmp=None, key=None, reverse=False): + self.content.sort(cmp, key, reverse) + self._notify_payload() + + def reverse(self): + self.content.reverse() + self._notify_payload() \ No newline at end of file