Skip to content
Snippets Groups Projects
Commit 8cd1eff5 authored by Ramin Yaghoubzadeh Torky's avatar Ramin Yaghoubzadeh Torky
Browse files

python: bugfix: Payload enter/exit always sent owner name instead of writer name, fixed.

parent c6feeb9e
Branches
No related tags found
No related merge requests found
......@@ -193,60 +193,80 @@ class Payload(dict):
self._collected_modifications = {}
self._collected_removals = []
self._update_timeout = update_timeout
self._batch_update_writer_name = None # name of remote buffer or None
self._batch_update_lock = threading.RLock()
self._batch_update_cond = threading.Condition(threading.RLock())
def merge(self, payload, writer_name=None):
if not self._batch_update_lock.acquire(False):
raise IUPayloadLockedError(self.iu)
for k, v in payload:
self._batch_update_lock.acquire(True)
#if not self._batch_update_lock.acquire(False):
# print('Someone failed a lock trying to merge '+str(payload.keys()))
# raise IUPayloadLockedError(self.iu)
#print("Payload.merge() IN, Merging "+str(payload.keys()))
for k, v in payload.items():
if type(k)==str:
k=unicode(k,'utf8')
if type(v)==str:
v=unicode(v,'utf8')
self.iu._modify_payload(is_delta=True, new_items=payload, keys_to_remove=[], writer_name=writer_name)
r = dict.update(payload) # batch update
#print("Payload.merge() OUT")
self._batch_update_lock.release()
return r
def __setitem__(self, k, v, writer_name=None):
if not self._batch_update_lock.acquire(False):
raise IUPayloadLockedError(self.iu)
self._batch_update_lock.acquire(True)
#if not self._batch_update_lock.acquire(False):
# print('Someone failed a lock trying to set '+k+' to '+v)
# raise IUPayloadLockedError(self.iu)
#print("Payload.__setitem__() IN, Setting "+k+' to '+v)
#print(" by writer "+str(writer_name))
if type(k)==str:
k=unicode(k,'utf8')
if type(v)==str:
v=unicode(v,'utf8')
if self._update_on_every_change:
#print(" running _modify_payload with writer name "+str(writer_name))
self.iu._modify_payload(is_delta=True, new_items={k:v}, keys_to_remove=[], writer_name=writer_name)
else: # Collect additions/modifications
self._batch_update_writer_name = writer_name
self._collected_modifications[k] = v
r = dict.__setitem__(self, k, v)
#print("Payload.__setitem__() OUT")
self._batch_update_lock.release()
return r
def __delitem__(self, k, writer_name=None):
if not self._batch_update_lock.acquire(False):
raise IUPayloadLockedError(self.iu)
self._batch_update_lock.acquire(True)
#if not self._batch_update_lock.acquire(False):
# print('Someone failed a lock trying to del '+k)
# raise IUPayloadLockedError(self.iu)
#print("Payload.__delitem__() IN, Deleting "+k)
if type(k)==str:
k=unicode(k,'utf8')
if self._update_on_every_change:
self.iu._modify_payload(is_delta=True, new_items={}, keys_to_remove=[k], writer_name=writer_name)
else: # Collect additions/modifications
self._batch_update_writer_name = writer_name
self._collected_removals.append(k)
r = dict.__delitem__(self, k)
#print("Payload.__delitem__() OUT")
self._batch_update_lock.release()
return r
# Context-manager based batch updates, not yet thread-safe (on remote updates)
def __enter__(self):
#print('running Payload.__enter__()')
self._wait_batch_update_lock(self._update_timeout)
self._update_on_every_change = False
def __exit__(self, type, value, traceback):
self.iu._modify_payload(is_delta=True, new_items=self._collected_modifications, keys_to_remove=self._collected_removals, writer_name=None)
#print('running Payload.__exit__()')
self.iu._modify_payload(is_delta=True, new_items=self._collected_modifications, keys_to_remove=self._collected_removals, writer_name=self._batch_update_writer_name)
self._collected_modifications = {}
self._collected_removals = []
self._update_on_every_change = True
self._batch_update_writer_name = None
self._batch_update_lock.release()
def _remotely_enforced_setitem(self, k, v):
......@@ -450,6 +470,7 @@ class IU(IUInterface):#{{{
# FIXME: Is it actually set locally?
self._increase_revision_number()
if self.is_published:
#print(' _modify_payload: running send_iu_pl_upd with writer name '+str(writer_name))
# send update to remote holders
self.buffer._send_iu_payload_update(
self,
......@@ -1296,6 +1317,9 @@ class InputBuffer(Buffer):
# Notify only for remotely triggered events;
# Discard updates that originate from this buffer
return
#else:
# print('Got update written by buffer '+str(event.data.writer_name))
if type_ is ipaaca_pb2.IUCommission:
# IU commit
iu = self._iu_store[event.data.uid]
......@@ -1389,16 +1413,20 @@ class OutputBuffer(Buffer):
with iu.revision_lock:
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
logger.warning("Remote update_payload operation failed because request was out of date; IU "+str(update.uid))
logger.warning(" Writer was: "+update.writer_name)
logger.warning(" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove))
logger.warning(" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision))
return 0
if update.is_delta:
#print('Writing delta update by '+str(update.writer_name))
with iu.payload:
for k in update.keys_to_remove:
iu.payload.__delitem__(k, writer_name=update.writer_name)
for k,v in update.new_items.items():
iu.payload.__setitem__(k, v, writer_name=update.writer_name)
else:
#print('Writing non-incr update by '+str(update.writer_name))
iu._set_payload(update.new_items, writer_name=update.writer_name)
# _set_payload etc. have also incremented the revision number
self.call_iu_event_handlers(update.uid, local=True, event_type=IUEventType.UPDATED, category=iu.category)
......@@ -1441,7 +1469,7 @@ class OutputBuffer(Buffer):
#if iu._uid is not None:
# raise IUPublishedError(iu)
#iu.uid = self._generate_iu_uid()
if iu.uid in self._iu_store:
if iu.uid in self._iu_store:
raise IUPublishedError(iu)
if iu.buffer is not None:
raise IUPublishedError(iu)
......@@ -1548,6 +1576,7 @@ class OutputBuffer(Buffer):
payload_update.writer_name = writer_name
informer = self._get_informer(iu._category)
informer.publishData(payload_update)
#print(" -- Sent update with writer name "+str(writer_name))
## --- RSB -------------------------------------------------------------------
......
......@@ -37,64 +37,91 @@ import threading
import ipaaca
NotificationState = ipaaca.enum(
NEW = 'new',
OLD = 'old',
DOWN = 'down'
)
NEW = 'new',
OLD = 'old',
DOWN = 'down'
)
class ComponentError(Exception):
def __init__(self, msg):
super(ComponentError, self).__init__(msg)
class ComponentNotifier(object):
NOTIFY_CATEGORY = "componentNotify"
SEND_CATEGORIES = "send_categories"
RECEIVE_CATEGORIES = "recv_categories"
STATE = "state"
NAME = "name"
FUNCTION = "function"
def __init__(self, componentName, componentFunction, sendCategories, receiveCategories, outBuffer=None, inBuffer=None):
self.componentName = componentName
self.componentFunction = componentFunction
self.sendCategories = frozenset(sendCategories)
self.receiveCategories = frozenset(receiveCategories)
self.inBuffer = inBuffer if inBuffer is not None else ipaaca.InputBuffer(componentName + 'Notifier')
self.outBuffer = outBuffer if outBuffer is not None else ipaaca.OutputBuffer(componentName + 'Notifier')
self.initialized = False
self.notificationHandlers = []
self.initializeLock = threading.Lock()
self.notificationHandlerLock = threading.Lock()
self.submitLock = threading.Lock()
def _submit_notify(self, isNew):
with self.submitLock:
notifyIU = ipaaca.Message(ComponentNotifier.NOTIFY_CATEGORY)
notifyIU.payload = {
ComponentNotifier.NAME: self.componentName,
ComponentNotifier.FUNCTION: self.componentFunction,
ComponentNotifier.SEND_CATEGORIES: ",".join(self.sendCategories),
ComponentNotifier.RECEIVE_CATEGORIES: ",".join(self.receiveCategories),
ComponentNotifier.STATE: NotificationState.NEW if isNew else NotificationState.OLD,
}
self.outBuffer.add(notifyIU)
def _handle_iu_event(self, iu, event_type, local):
if iu.payload[ComponentNotifier.NAME] == self.componentName:
return
with self.notificationHandlerLock:
for h in self.notificationHandlers:
h(iu, event_type, local)
if iu.payload[ComponentNotifier.STATE] == "new":
#print("submitting")
self._submit_notify(False)
NOTIFY_CATEGORY = "componentNotify"
SEND_CATEGORIES = "send_categories"
RECEIVE_CATEGORIES = "recv_categories"
STATE = "state"
NAME = "name"
FUNCTION = "function"
def __init__(self, component_name, component_function, send_categories, receive_categories, out_buffer=None, in_buffer=None):
self.component_name = component_name
self.component_function = component_function
self.send_categories = frozenset(send_categories)
self.receive_categories = frozenset(receive_categories)
self.in_buffer = in_buffer if in_buffer is not None else ipaaca.InputBuffer(component_name + 'Notifier')
self.out_buffer = out_buffer if out_buffer is not None else ipaaca.OutputBuffer(component_name + 'Notifier')
self.terminated = False
self.initialized = False
self.notification_handlers = []
self.initialize_lock = threading.Lock()
self.notification_handler_lock = threading.Lock()
self.submit_lock = threading.Lock()
def _submit_notify(self, is_new):
with self.submit_lock:
notify_iu = ipaaca.Message(ComponentNotifier.NOTIFY_CATEGORY)
notify_iu.payload = {
ComponentNotifier.NAME: self.component_name,
ComponentNotifier.FUNCTION: self.component_function,
ComponentNotifier.SEND_CATEGORIES: ",".join(self.send_categories),
ComponentNotifier.RECEIVE_CATEGORIES: ",".join(self.receive_categories),
ComponentNotifier.STATE: NotificationState.NEW if is_new else NotificationState.OLD,
}
self.out_buffer.add(notify_iu)
def terminate(self):
with self.submit_lock:
if self.terminated: return
self.terminated = True
notify_iu = ipaaca.Message(ComponentNotifier.NOTIFY_CATEGORY)
notify_iu.payload = {
ComponentNotifier.NAME: self.component_name,
ComponentNotifier.FUNCTION: self.component_function,
ComponentNotifier.SEND_CATEGORIES: ",".join(self.send_categories),
ComponentNotifier.RECEIVE_CATEGORIES: ",".join(self.receive_categories),
ComponentNotifier.STATE: NotificationState.DOWN,
}
self.out_buffer.add(notify_iu)
def _handle_iu_event(self, iu, event_type, local):
if iu.payload[ComponentNotifier.NAME] == self.component_name:
return
with self.notification_handler_lock:
for h in self.notification_handlers:
h(iu, event_type, local)
if iu.payload[ComponentNotifier.STATE] == "new":
#print("submitting")
self._submit_notify(False)
def add_notification_handler(self, handler):
with self.notification_handler_lock:
self.notification_handlers.append(handler)
def add_notification_handler(self, handler):
with self.notificationHandlerLock:
self.notificationHandlers.append(handler)
def initialize(self):
with self.initializeLock:
if (not self.initialized):
self.inBuffer.register_handler(self._handle_iu_event, ipaaca.IUEventType.MESSAGE, ComponentNotifier.NOTIFY_CATEGORY)
self._submit_notify(isNew=True)
self.initialized = True
def initialize(self):
with self.initialize_lock:
if self.terminated:
raise ComponentError('Attempted to reinitialize component '+component_name+' after termination')
if (not self.initialized):
self.in_buffer.register_handler(self._handle_iu_event, ipaaca.IUEventType.MESSAGE, ComponentNotifier.NOTIFY_CATEGORY)
self._submit_notify(True)
self.initialized = True
def __enter__(self):
self.initialize()
def __exit__(self, t, v, tb):
self.terminate()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment