diff --git a/ipaacalib/java/src/ipaaca/AbstractIU.java b/ipaacalib/java/src/ipaaca/AbstractIU.java index e521b813161b39a8b5c05b0882ddb350db6a84e0..d79f1622871ec4f960781e6630604e9fe3054510 100644 --- a/ipaacalib/java/src/ipaaca/AbstractIU.java +++ b/ipaacalib/java/src/ipaaca/AbstractIU.java @@ -231,7 +231,7 @@ public abstract class AbstractIU List<PayloadItem> items = new ArrayList<PayloadItem>(); for (Entry<String, String> entry : newPayload.entrySet()) { - PayloadItem item = PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("str") // TODO:default type? + PayloadItem item = PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("STR") .build(); items.add(item); } diff --git a/ipaacalib/java/src/ipaaca/IUConverter.java b/ipaacalib/java/src/ipaaca/IUConverter.java index a185992e1d9204819e622202a37f52825ae6f01c..d2138d6945bafbaff9a797f3b5b9fee5412ef635 100644 --- a/ipaacalib/java/src/ipaaca/IUConverter.java +++ b/ipaacalib/java/src/ipaaca/IUConverter.java @@ -79,7 +79,7 @@ public class IUConverter implements Converter<ByteBuffer> List<PayloadItem> payloadItems = new ArrayList<PayloadItem>(); for (Entry<String, String> entry : iua.getPayload().entrySet()) { - payloadItems.add(PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("str").build()); + payloadItems.add(PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("STR").build()); } List<LinkSet> links = new ArrayList<LinkSet>(); @@ -95,7 +95,7 @@ public class IUConverter implements Converter<ByteBuffer> } IU iu = IU.newBuilder().setUid(iua.getUid()).setRevision(iua.getRevision()).setCategory(iua.getCategory()) .setOwnerName(iua.getOwnerName()).setCommitted(iua.isCommitted()).setAccessMode(accessMode) - .setReadOnly(iua.isReadOnly()).setPayloadType("MAP").addAllPayload(payloadItems).addAllLinks(links).build(); + .setReadOnly(iua.isReadOnly()).setPayloadType("STR").addAllPayload(payloadItems).addAllLinks(links).build(); return new WireContents<ByteBuffer>(ByteBuffer.wrap(iu.toByteArray()), "ipaaca-iu"); } diff --git a/ipaacalib/java/src/ipaaca/LocalIU.java b/ipaacalib/java/src/ipaaca/LocalIU.java index 56de4f6242548c9cf61ac083f8823764786cbaff..6e87a82484f882a25dddda7e314dfd76ebdb3060 100644 --- a/ipaacalib/java/src/ipaaca/LocalIU.java +++ b/ipaacalib/java/src/ipaaca/LocalIU.java @@ -230,7 +230,7 @@ public class LocalIU extends AbstractIU if (isPublished()) { // send update to remote holders - PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("str") // TODO: fix this, default in .proto? + PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR") .build(); IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true) .setWriterName(writer == null ? getOwnerName() : writer).addNewItems(newItem).build(); @@ -256,7 +256,7 @@ public class LocalIU extends AbstractIU .setWriterName(writer == null ? getOwnerName() : writer); for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet()) { - PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("str") // TODO: fix this, default in .proto? + PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("STR") .build(); builder.addNewItems(newItem); diff --git a/ipaacalib/java/src/ipaaca/Payload.java b/ipaacalib/java/src/ipaaca/Payload.java index 74c840bee548b1aa55935c9a06b147766a6146ec..315d6867878e9f720633f78ecc11e91a4d4f4eb1 100644 --- a/ipaacalib/java/src/ipaaca/Payload.java +++ b/ipaacalib/java/src/ipaaca/Payload.java @@ -95,8 +95,7 @@ public class Payload implements Map<String, String> } public String pseudoConvertFromJSON(String value, String type) { - if (type.equals("json")) { - System.out.println("Received JSON IU"); + if (type.equals("JSON")) { if (value.startsWith("\"")) { return value.replaceAll("\\\"", ""); } else if (value.startsWith("{") || value.startsWith("[") || value.matches("true") || value.matches("false") || value.matches("-?[0-9]*[.,]?[0-9][0-9]*.*")) { diff --git a/ipaacalib/java/src/ipaaca/RemotePushIU.java b/ipaacalib/java/src/ipaaca/RemotePushIU.java index 5b5cc2f733a3c63d76cf53564e030a7cde2420bf..bfb8060403c42031dfbf70c7b8c55eadaf72b334 100644 --- a/ipaacalib/java/src/ipaaca/RemotePushIU.java +++ b/ipaacalib/java/src/ipaaca/RemotePushIU.java @@ -99,7 +99,7 @@ public class RemotePushIU extends AbstractIU { throw new IUReadOnlyException(this); } - PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("str").build();// TODO use default type in .proto + PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR").build(); IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision(getRevision()) .setWriterName(getBuffer().getUniqueName()).addNewItems(newItem).build(); @@ -144,7 +144,7 @@ public class RemotePushIU extends AbstractIU .setWriterName(getBuffer().getUniqueName()); for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet()) { - PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("str") // TODO: fix this, default in .proto? + PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("STR") .build(); builder.addNewItems(newItem); @@ -301,9 +301,9 @@ public class RemotePushIU extends AbstractIU payload.enforcedRemoveItem(key); } for (PayloadItem item : update.getNewItemsList()) { - if (item.getType().equals("str")) { + if (item.getType().equals("STR")) { payload.enforcedSetItem(item.getKey(), item.getValue()); - } else if (item.getType().equals("json")) { + } else if (item.getType().equals("JSON")) { String value = item.getValue(); if (value.startsWith("\"")) { payload.enforcedSetItem(item.getKey(), value.replaceAll("\\\"", "")); diff --git a/ipaacalib/python/src/ipaaca/__init__.py b/ipaacalib/python/src/ipaaca/__init__.py index bbf51e3bde5b2dd1e176f81c91ac22f6e24ca989..09321c0bd26070239ee9a44f354e62d5f40eeaff 100755 --- a/ipaacalib/python/src/ipaaca/__init__.py +++ b/ipaacalib/python/src/ipaaca/__init__.py @@ -35,17 +35,17 @@ from __future__ import division, print_function import rsb import rsb.converter -from ipaaca.misc import logger, IpaacaArgumentParser - import ipaaca_pb2 import ipaaca.converter from ipaaca.buffer import InputBuffer, OutputBuffer from ipaaca.exception import * from ipaaca.iu import IU, Message, IUAccessMode, IUEventType +from ipaaca.misc import enable_logging, IpaacaArgumentParser from ipaaca.payload import Payload def initialize_ipaaca_rsb(): + ''''Register own RSB Converters and initialise RSB from default config file.''' rsb.converter.registerGlobalConverter( ipaaca.converter.IntConverter( wireSchema="int32", @@ -85,8 +85,5 @@ def initialize_ipaaca_rsb(): rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources() - -## --- Module initialisation ------------------------------------------------- - -# register our own RSB Converters +# Initialise module initialize_ipaaca_rsb() diff --git a/ipaacalib/python/src/ipaaca/buffer.py b/ipaacalib/python/src/ipaaca/buffer.py index 704f754b9abb1bfb0fc8838b1c303ca35a72bd3a..b1496f7a6e7c8029af2828a76e83b7a05362bd4f 100644 --- a/ipaacalib/python/src/ipaaca/buffer.py +++ b/ipaacalib/python/src/ipaaca/buffer.py @@ -44,7 +44,6 @@ import ipaaca.exception import ipaaca.converter import ipaaca.iu -from ipaaca.misc import logger __all__ = [ @@ -52,6 +51,7 @@ __all__ = [ 'OutputBuffer', ] +LOGGER = ipaaca.misc.get_library_logger() class IUStore(dict): """A dictionary storing IUs.""" @@ -155,6 +155,8 @@ class Buffer(object): for_categories -- a list of category names or None if handler should be called for all categories """ + if handler_function in [h._handler_function for h in self._iu_event_handlers]: + LOGGER.warn("The handler function '" + handler_function.__name__ + '" has been registered before.') handler = IUEventHandler(handler_function=handler_function, for_event_types=for_event_types, for_categories=for_categories) self._iu_event_handlers.append(handler) return handler @@ -196,9 +198,7 @@ class InputBuffer(Buffer): # add own uuid as identifier for hidden category. self._add_category_listener(str(self._uuid)) if category_interests is not None: - for cat in category_interests: - self._add_category_listener(cat) - + self.add_category_interests(category_interests) def _get_remote_server(self, iu): '''Return (or create, store and return) a remote server.''' @@ -216,13 +216,20 @@ class InputBuffer(Buffer): return remote_server def _add_category_listener(self, iu_category): - '''Return (or create, store and return) a category listener on a specific channel.''' + '''Create and store a listener on a specific category.''' if iu_category not in self._listener_store: cat_listener = rsb.createListener(rsb.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)), config=self._participant_config) cat_listener.addHandler(self._handle_iu_events) self._listener_store[iu_category] = cat_listener self._category_interests.append(iu_category) - logger.info("Added listener in scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+iu_category) + LOGGER.info("Added listener in scope /ipaaca/channel/" + str(self._channel) + "/category/" + iu_category) + + def _remove_category_listener(self, iu_category): + '''Remove the listener for a specific category.''' + if iu_category in self._listener_store and iu_category in self._category_interests: + del self._listener_store[iu_category] + self._category_interests.remove(iu_category) + LOGGER.info("Removed listener in scope /ipaaca/channel/" + str(self._channel) + "/category/ " + iu_category) def _handle_iu_events(self, event): '''Dispatch incoming IU events. @@ -259,7 +266,7 @@ class InputBuffer(Buffer): # send resend request to remote server self._request_remote_resend(event.data) else: - logger.warning("Received an update for an IU which we did not receive before.") + LOGGER.warning("Received an update for an IU which we did not receive before.") return # an update to an existing IU if type_ is ipaaca_pb2.IURetraction: @@ -294,7 +301,7 @@ class InputBuffer(Buffer): iu._apply_link_update(event.data) self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.LINKSUPDATED, category=iu.category) else: - logger.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_)) + LOGGER.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_)) def add_category_interests(self, category_interests): if hasattr(category_interests, '__iter__'): @@ -303,6 +310,13 @@ class InputBuffer(Buffer): else: self._add_category_listener(category_interests) + def remove_category_interests(self, category_interests): + if hasattr(category_interests, '__iter__'): + for interest in category_interests: + self._remove_category_listener(interest) + else: + self._remove_category_listener(category_interests) + def _request_remote_resend(self, iu): remote_server = self._get_remote_server(iu) resend_request = ipaaca_pb2.IUResendRequest() @@ -363,13 +377,13 @@ class OutputBuffer(Buffer): def _remote_update_links(self, update): '''Apply a remotely requested update to one of the stored IU's links.''' if update.uid not in self._iu_store: - logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid)) + LOGGER.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid)) return 0 iu = self._iu_store[update.uid] with iu.revision_lock: if (update.revision != 0) and (update.revision != iu.revision): # (0 means "do not pay attention to the revision number" -> "force update") - logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid)) + LOGGER.warning("Remote write operation failed because request was out of date; IU "+str(update.uid)) return 0 if update.is_delta: iu.modify_links(add=update.new_links, remove=update.links_to_remove, writer_name=update.writer_name) @@ -381,16 +395,16 @@ class OutputBuffer(Buffer): def _remote_update_payload(self, update): '''Apply a remotely requested update to one of the stored IU's payload.''' if update.uid not in self._iu_store: - logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid)) + LOGGER.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid)) return 0 iu = self._iu_store[update.uid] with iu.revision_lock: if (update.revision != 0) and (update.revision != iu.revision): # (0 means "do not pay attention to the revision number" -> "force update") - logger.warning(u"Remote update_payload operation failed because request was out of date; IU "+str(update.uid)) - logger.warning(u" Writer was: "+update.writer_name) - logger.warning(u" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove)) - logger.warning(u" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision)) + LOGGER.warning(u"Remote update_payload operation failed because request was out of date; IU "+str(update.uid)) + LOGGER.warning(u" Writer was: "+update.writer_name) + LOGGER.warning(u" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove)) + LOGGER.warning(u" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision)) return 0 if update.is_delta: #print('Writing delta update by '+str(update.writer_name)) @@ -409,7 +423,7 @@ class OutputBuffer(Buffer): def _remote_request_resend(self, iu_resend_request_pack): ''' Resend a requested IU over the specific hidden category.''' if iu_resend_request_pack.uid not in self._iu_store: - logger.warning("Remote side requested resending of non-existent IU "+str(iu_resend_request_pack.uid)) + LOGGER.warning("Remote side requested resending of non-existent IU "+str(iu_resend_request_pack.uid)) return 0 iu = self._iu_store[iu_resend_request_pack.uid] with iu.revision_lock: @@ -423,13 +437,13 @@ class OutputBuffer(Buffer): def _remote_commit(self, iu_commission): '''Apply a remotely requested commit to one of the stored IUs.''' if iu_commission.uid not in self._iu_store: - logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid)) + LOGGER.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid)) return 0 iu = self._iu_store[iu_commission.uid] with iu.revision_lock: if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision): # (0 means "do not pay attention to the revision number" -> "force update") - logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid)) + LOGGER.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid)) return 0 if iu.committed: return 0 @@ -441,14 +455,14 @@ class OutputBuffer(Buffer): def _get_informer(self, iu_category): '''Return (or create, store and return) an informer object for IUs of the specified category.''' if iu_category in self._informer_store: - logger.info("Returning informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)) + LOGGER.info("Returning informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)) return self._informer_store[iu_category] informer_iu = rsb.createInformer( rsb.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)), config=self._participant_config, dataType=object) self._informer_store[iu_category] = informer_iu #new_tuple - logger.info("Returning NEW informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)) + LOGGER.info("Returning NEW informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)) return informer_iu #return new_tuple def add(self, iu): @@ -553,7 +567,11 @@ class OutputBuffer(Buffer): new_items = {} if keys_to_remove is None: keys_to_remove = [] - payload_update = ipaaca.converter.IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision) + payload_update = ipaaca.converter.IUPayloadUpdate( + uid=iu._uid, + revision=revision, + is_delta=is_delta, + payload_type=iu.payload_type) payload_update.new_items = new_items if is_delta: payload_update.keys_to_remove = keys_to_remove diff --git a/ipaacalib/python/src/ipaaca/converter.py b/ipaacalib/python/src/ipaaca/converter.py index 8a9137305bd69b8d6643aa5af6217b5b3929d4d2..2bd3e690e32cf404a94070ed541e0e975e74c513 100644 --- a/ipaacalib/python/src/ipaaca/converter.py +++ b/ipaacalib/python/src/ipaaca/converter.py @@ -37,14 +37,18 @@ import collections import rsb.converter import ipaaca_pb2 +import ipaaca.defaults +import ipaaca.exception import ipaaca.iu -from ipaaca.misc import logger +import ipaaca.misc + +LOGGER = ipaaca.misc.get_library_logger() try: import simplejson as json except ImportError: import json - logger.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.') + LOGGER.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.') __all__ = [ @@ -74,20 +78,25 @@ class IntConverter(rsb.converter.Converter): return pbo.value -def pack_payload_entry(entry, key, value): +def pack_payload_entry(entry, key, value, _type=ipaaca.iu.IUPayloadType.JSON): entry.key = key - entry.value = json.dumps(value) - entry.type = 'json' + if _type == ipaaca.iu.IUPayloadType.JSON: + entry.value = json.dumps(value) + elif _type == ipaaca.iu.IUPayloadType.STR or _type == 'MAP': + entry.value = str(value) + else: + raise ipaaca.exception.IpaacaException('Asked to send payload entry with unsupported type "' + _type + '".') + entry.type = _type def unpack_payload_entry(entry): - # We assume that the only transfer types are 'str' or 'json'. Both are transparently handled by json.loads - if entry.type == 'json': + # We assume that the only transfer types are 'STR' or 'JSON'. Both are transparently handled by json.loads + if entry.type == ipaaca.iu.IUPayloadType.JSON: return json.loads(entry.value) - elif entry.type == 'str': + elif entry.type == ipaaca.iu.IUPayloadType.STR or entry.type == 'str': return entry.value else: - logger.warn('Received payload entry with unsupported type "' + entry.type + '".') + LOGGER.warn('Received payload entry with unsupported type "' + entry.type + '".') return entry.value @@ -113,7 +122,7 @@ class IUConverter(rsb.converter.Converter): pbo.read_only = iu._read_only for k, v in iu._payload.iteritems(): entry = pbo.payload.add() - pack_payload_entry(entry, k, v) + pack_payload_entry(entry, k, v, iu.payload_type) for type_ in iu._links.keys(): linkset = pbo.links.add() linkset.type = type_ @@ -136,7 +145,7 @@ class IUConverter(rsb.converter.Converter): read_only = pbo.read_only, owner_name = pbo.owner_name, category = pbo.category, - payload_type = pbo.payload_type, + payload_type = 'str' if pbo.payload_type is 'MAP' else pbo.payload_type, committed = pbo.committed, payload=_payload, links=_links) @@ -200,7 +209,7 @@ class IULinkUpdateConverter(rsb.converter.Converter): if type == IULinkUpdate: pbo = ipaaca_pb2.IULinkUpdate() pbo.ParseFromString( str(byte_stream) ) - logger.debug('received an IULinkUpdate for revision '+str(pbo.revision)) + LOGGER.debug('received an IULinkUpdate for revision '+str(pbo.revision)) iu_link_up = IULinkUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta) for entry in pbo.new_links: iu_link_up.new_links[str(entry.type)] = set(entry.targets) @@ -213,10 +222,11 @@ class IULinkUpdateConverter(rsb.converter.Converter): class IUPayloadUpdate(object): - def __init__(self, uid, revision, is_delta, writer_name="undef", new_items=None, keys_to_remove=None): + def __init__(self, uid, revision, is_delta, payload_type, writer_name="undef", new_items=None, keys_to_remove=None): super(IUPayloadUpdate, self).__init__() self.uid = uid self.revision = revision + self.payload_type = payload_type self.writer_name = writer_name self.is_delta = is_delta self.new_items = {} if new_items is None else new_items @@ -226,6 +236,7 @@ class IUPayloadUpdate(object): s = 'PayloadUpdate(' + 'uid=' + self.uid + ', ' s += 'revision='+str(self.revision)+', ' s += 'writer_name='+str(self.writer_name)+', ' + s += 'payload_type='+str(self.payload_type)+', ' s += 'is_delta='+str(self.is_delta)+', ' s += 'new_items = '+str(self.new_items)+', ' s += 'keys_to_remove = '+str(self.keys_to_remove)+')' @@ -241,9 +252,9 @@ class IUPayloadUpdateConverter(rsb.converter.Converter): pbo.uid = iu_payload_update.uid pbo.writer_name = iu_payload_update.writer_name pbo.revision = iu_payload_update.revision - for k,v in iu_payload_update.new_items.items(): + for k, v in iu_payload_update.new_items.items(): entry = pbo.new_items.add() - pack_payload_entry(entry, k, v) + pack_payload_entry(entry, k, v, iu_payload_update.payload_type) pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove) pbo.is_delta = iu_payload_update.is_delta return bytearray(pbo.SerializeToString()), self.wireSchema @@ -253,8 +264,8 @@ class IUPayloadUpdateConverter(rsb.converter.Converter): if type == IUPayloadUpdate: pbo = ipaaca_pb2.IUPayloadUpdate() pbo.ParseFromString( str(byte_stream) ) - logger.debug('received an IUPayloadUpdate for revision '+str(pbo.revision)) - iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta) + LOGGER.debug('received an IUPayloadUpdate for revision '+str(pbo.revision)) + iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, payload_type=None, writer_name=pbo.writer_name, is_delta=pbo.is_delta) for entry in pbo.new_items: iu_up.new_items[entry.key] = unpack_payload_entry(entry) iu_up.keys_to_remove = pbo.keys_to_remove[:] diff --git a/ipaacalib/python/src/ipaaca/defaults.py b/ipaacalib/python/src/ipaaca/defaults.py index 46908a1b92640fb58ef36b486953cab0437d28ca..706c714b1d270838f04a84f3cd20dffe432d747c 100644 --- a/ipaacalib/python/src/ipaaca/defaults.py +++ b/ipaacalib/python/src/ipaaca/defaults.py @@ -31,4 +31,8 @@ # Excellence Initiative. IPAACA_DEFAULT_CHANNEL = 'default' -IPAACA_DEFAULT_LOGGING_LEVEL = 'WARNING' \ No newline at end of file + +IPAACA_LOGGER_NAME = 'ipaaca' +IPAACA_DEFAULT_LOGGING_LEVEL = 'WARNING' + +IPAACA_DEFAULT_IU_PAYLOAD_TYPE = 'JSON' # one of ipaaca.iu.IUPayloadType diff --git a/ipaacalib/python/src/ipaaca/iu.py b/ipaacalib/python/src/ipaaca/iu.py index ec51152c5782cb30b5e89a336e8a11b65aea7dbf..71f20509d1d5d371bbf9e4fb8d191867e4403c9b 100644 --- a/ipaacalib/python/src/ipaaca/iu.py +++ b/ipaacalib/python/src/ipaaca/iu.py @@ -37,8 +37,6 @@ import copy import threading import uuid -from ipaaca.misc import logger - import ipaaca.converter import ipaaca.exception import ipaaca.misc @@ -48,10 +46,12 @@ import ipaaca.payload __all__ = [ 'IUAccessMode', 'IUEventType', + 'IUPayloadType', 'IU', 'Message', ] +LOGGER = ipaaca.misc.get_library_logger() IUAccessMode = ipaaca.misc.enum( PUSH = 'PUSH', @@ -71,11 +71,17 @@ IUEventType = ipaaca.misc.enum( ) +IUPayloadType = ipaaca.misc.enum( + JSON = 'JSON', + STR = 'STR' +) + + class IUInterface(object): """Base class of all specialised IU classes.""" - def __init__(self, uid, access_mode=IUAccessMode.PUSH, read_only=False): + def __init__(self, uid, access_mode=IUAccessMode.PUSH, read_only=False, payload_type=None): """Creates an IU. Keyword arguments: @@ -86,12 +92,12 @@ class IUInterface(object): self._uid = uid self._revision = None self._category = None - self._payload_type = None self._owner_name = None self._committed = False self._retracted = False self._access_mode = access_mode self._read_only = read_only + self._payload_type = payload_type if payload_type is not None else ipaaca.defaults.IPAACA_DEFAULT_IU_PAYLOAD_TYPE self._buffer = None # payload is not present here self._links = collections.defaultdict(set) @@ -162,7 +168,15 @@ class IUInterface(object): def _get_payload_type(self): return self._payload_type - payload_type = property(fget=_get_payload_type, doc='Type of the IU payload') + def _set_payload_type(self, type): + if self._buffer is None: + self._payload_type = type + else: + raise IpaacaException('The IU is already in a buffer, cannot change payload type anymore.') + payload_type = property( + fget=_get_payload_type, + fset=_set_payload_type, + doc='Type of the IU payload') def _get_committed(self): return self._committed @@ -194,7 +208,7 @@ class IUInterface(object): return self._buffer def _set_buffer(self, buffer): if self._buffer is not None: - raise Exception('The IU is already in a buffer, cannot move it.') + raise IpaacaException('The IU is already in a buffer, cannot move it.') self._buffer = buffer buffer = property( fget=_get_buffer, @@ -217,12 +231,11 @@ class IU(IUInterface): """A local IU.""" - def __init__(self, category='undef', access_mode=IUAccessMode.PUSH, read_only=False, _payload_type='MAP'): - super(IU, self).__init__(uid=None, access_mode=access_mode, read_only=read_only) + def __init__(self, category='undef', access_mode=IUAccessMode.PUSH, read_only=False, payload_type=None): + super(IU, self).__init__(uid=None, access_mode=access_mode, read_only=read_only, payload_type=payload_type) self._revision = 1 self.uid = str(uuid.uuid4()) self._category = str(category) - self._payload_type = _payload_type self.revision_lock = threading.RLock() self._payload = ipaaca.payload.Payload(iu=self) @@ -323,23 +336,23 @@ class IU(IUInterface): class Message(IU): """Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls.""" - def __init__(self, category='undef', access_mode=IUAccessMode.MESSAGE, read_only=True, _payload_type='MAP'): - super(Message, self).__init__(category=str(category), access_mode=access_mode, read_only=read_only, _payload_type=_payload_type) + def __init__(self, category='undef', access_mode=IUAccessMode.MESSAGE, read_only=True, payload_type=None): + super(Message, self).__init__(category=str(category), access_mode=access_mode, read_only=read_only, payload_type=payload_type) def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None): if self.is_published: - logger.info('Info: modifying a Message after sending has no global effects') + LOGGER.info('Info: modifying a Message after sending has no global effects') def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): if self.is_published: - logger.info('Info: modifying a Message after sending has no global effects') + LOGGER.info('Info: modifying a Message after sending has no global effects') def _increase_revision_number(self): self._revision += 1 def _internal_commit(self, writer_name=None): if self.is_published: - logger.info('Info: committing to a Message after sending has no global effects') + LOGGER.info('Info: committing to a Message after sending has no global effects') def commit(self): return self._internal_commit() @@ -348,7 +361,7 @@ class Message(IU): return self._payload def _set_payload(self, new_pl, writer_name=None): if self.is_published: - logger.info('Info: modifying a Message after sending has no global effects') + LOGGER.info('Info: modifying a Message after sending has no global effects') else: if self.committed: raise ipaaca.exception.IUCommittedError(self) @@ -395,11 +408,10 @@ class RemoteMessage(IUInterface): """A remote IU with access mode 'MESSAGE'.""" def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links): - super(RemoteMessage, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only) + super(RemoteMessage, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only, payload_type=payload_type) self._revision = revision self._category = category self.owner_name = owner_name - self._payload_type = payload_type self._committed = committed self._retracted = False # NOTE Since the payload is an already-existant Payload which we didn't modify ourselves, @@ -409,18 +421,18 @@ class RemoteMessage(IUInterface): self._links = links def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None): - logger.info('Info: modifying a RemoteMessage only has local effects') + LOGGER.info('Info: modifying a RemoteMessage only has local effects') def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): - logger.info('Info: modifying a RemoteMessage only has local effects') + LOGGER.info('Info: modifying a RemoteMessage only has local effects') def commit(self): - logger.info('Info: committing to a RemoteMessage only has local effects') + LOGGER.info('Info: committing to a RemoteMessage only has local effects') def _get_payload(self): return self._payload def _set_payload(self, new_pl): - logger.info('Info: modifying a RemoteMessage only has local effects') + LOGGER.info('Info: modifying a RemoteMessage only has local effects') self._payload = ipaaca.payload.Payload(iu=self, new_payload=new_pl, omit_init_update_message=True) payload = property( fget=_get_payload, @@ -429,7 +441,7 @@ class RemoteMessage(IUInterface): def _apply_link_update(self, update): """Apply a IULinkUpdate to the IU.""" - logger.warning('Warning: should never be called: RemoteMessage._apply_link_update') + LOGGER.warning('Warning: should never be called: RemoteMessage._apply_link_update') self._revision = update.revision if update.is_delta: self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove) @@ -438,7 +450,7 @@ class RemoteMessage(IUInterface): def _apply_update(self, update): """Apply a IUPayloadUpdate to the IU.""" - logger.warning('Warning: should never be called: RemoteMessage._apply_update') + LOGGER.warning('Warning: should never be called: RemoteMessage._apply_update') self._revision = update.revision if update.is_delta: for k in update.keys_to_remove: self.payload._remotely_enforced_delitem(k) @@ -449,12 +461,12 @@ class RemoteMessage(IUInterface): def _apply_commission(self): """Apply commission to the IU""" - logger.warning('Warning: should never be called: RemoteMessage._apply_commission') + LOGGER.warning('Warning: should never be called: RemoteMessage._apply_commission') self._committed = True def _apply_retraction(self): """Apply retraction to the IU""" - logger.warning('Warning: should never be called: RemoteMessage._apply_retraction') + LOGGER.warning('Warning: should never be called: RemoteMessage._apply_retraction') self._retracted = True @@ -463,11 +475,10 @@ class RemotePushIU(IUInterface): """A remote IU with access mode 'PUSH'.""" def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links): - super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only) + super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only, payload_type=payload_type) self._revision = revision self._category = category self.owner_name = owner_name - self._payload_type = payload_type self._committed = committed self._retracted = False # NOTE Since the payload is an already-existant Payload which we didn't modify ourselves, @@ -505,6 +516,7 @@ class RemotePushIU(IUInterface): requested_update = ipaaca.converter.IUPayloadUpdate( uid=self.uid, revision=self.revision, + payload_type=self.payload_type, is_delta=is_delta, writer_name=self.buffer.unique_name, new_items=new_items, @@ -546,6 +558,7 @@ class RemotePushIU(IUInterface): requested_update = ipaaca.converter.IUPayloadUpdate( uid=self.uid, revision=self.revision, + payload_type=self.payload_type, is_delta=False, writer_name=self.buffer.unique_name, new_items=new_pl, diff --git a/ipaacalib/python/src/ipaaca/misc.py b/ipaacalib/python/src/ipaaca/misc.py index 4894c19016d097b048639d439e67dc94447fc0fa..44932d5f47442df1b4ce44c5399a31433a374431 100644 --- a/ipaacalib/python/src/ipaaca/misc.py +++ b/ipaacalib/python/src/ipaaca/misc.py @@ -41,7 +41,6 @@ import ipaaca.defaults __all__ = [ 'enum', - 'logger', 'IpaacaArgumentParser', ] @@ -54,26 +53,51 @@ def enum(*sequential, **named): whats-the-best-way-to-implement-an-enum-in-python/1695250#1695250 """ enums = dict(zip(sequential, range(len(sequential))), **named) - return type('Enum', (), enums) + enums['_choices'] = enums.keys() + return type('Enum', (object,), enums) -# Create a global logger for ipaaca class IpaacaLoggingHandler(logging.Handler): + '''A logging handler that prints to stdout.''' - def __init__(self, level=logging.NOTSET): + def __init__(self, prefix='IPAACA', level=logging.NOTSET): logging.Handler.__init__(self, level) + self._prefix = prefix def emit(self, record): - meta = '[ipaaca] (%s) ' % str(record.levelname) + meta = '[%s: %s] ' % (self._prefix, str(record.levelname)) msg = str(record.msg.format(record.args)) print(meta + msg) -logger = logging.getLogger('ipaaca') -logger.addHandler(IpaacaLoggingHandler()) -logger.setLevel(level=ipaaca.defaults.IPAACA_DEFAULT_LOGGING_LEVEL) +class GenericNoLoggingHandler(logging.Handler): + '''A logging handler that produces no output''' + def emit(self, record): pass +def get_library_logger(): + '''Get ipaaca's library-wide logger object.''' + return logging.getLogger(ipaaca.defaults.IPAACA_LOGGER_NAME) + + +__IPAACA_LOGGING_HANDLER = IpaacaLoggingHandler('IPAACA') +__GENERIC_NO_LOG_HANDLER = GenericNoLoggingHandler() + +# By default, suppress library logging +# - for IPAACA +get_library_logger().addHandler(__GENERIC_NO_LOG_HANDLER) +# - for RSB +logging.getLogger('rsb').addHandler(__GENERIC_NO_LOG_HANDLER) + + +def enable_logging(level=None): + '''Enable ipaaca's 'library-wide logging.''' + ipaaca_logger = get_library_logger() + ipaaca_logger.addHandler(__IPAACA_LOGGING_HANDLER) + ipaaca_logger.removeHandler(__GENERIC_NO_LOG_HANDLER) + ipaaca_logger.setLevel(level=level if level is not None else + ipaaca.defaults.IPAACA_DEFAULT_LOGGING_LEVEL) + class IpaacaArgumentParser(argparse.ArgumentParser): @@ -82,33 +106,71 @@ class IpaacaArgumentParser(argparse.ArgumentParser): def __call__(self, parser, namespace, values, option_string=None): ipaaca.defaults.IPAACA_DEFAULT_CHANNEL = values + class IpaacaPayloadTypeAction(argparse.Action): + + def __call__(self, parser, namespace, values, option_string=None): + ipaaca.defaults.IPAACA_DEFAULT_IU_PAYLOAD_TYPE = values + class IpaacaLoggingLevelAction(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): - logger.setLevel(level=values) - - def __init__(self, prog=None, usage=None, description=None, epilog=None, parents=[], formatter_class=argparse.HelpFormatter, prefix_chars='-', fromfile_prefix_chars=None, argument_default=None, conflict_handler='error', add_help=True): - super(IpaacaArgumentParser, self).__init__(prog=prog, usage=usage, description=description, epilog=epilog, parents=parents, formatter_class=formatter_class, prefix_chars=prefix_chars, fromfile_prefix_chars=fromfile_prefix_chars, argument_default=argument_default, conflict_handler=conflict_handler, add_help=add_help) + enable_logging(values) + + class IpaacaRSBLoggingLevelAction(argparse.Action): + + def __call__(self, parser, namespace, values, option_string=None): + rsb_logger = logging.getLogger('rsb') + rsb_logger.addHandler(IpaacaLoggingHandler('RSB')) + rsb_logger.removeHandler(__GENERIC_NO_LOG_HANDLER) + rsb_logger.setLevel(level=values) + + def __init__(self, prog=None, usage=None, description=None, epilog=None, + parents=[], formatter_class=argparse.HelpFormatter, + prefix_chars='-', fromfile_prefix_chars=None, + argument_default=None, conflict_handler='error', add_help=True): + super(IpaacaArgumentParser, self).__init__(prog=prog, usage=usage, + description=description, epilog=epilog, parents=parents, + formatter_class=formatter_class, prefix_chars=prefix_chars, + fromfile_prefix_chars=fromfile_prefix_chars, + argument_default=argument_default, + conflict_handler=conflict_handler, add_help=add_help) def _add_ipaaca_lib_arguments(self): - ipaacalib_group = self.add_argument_group(title='IPAACA library arguments''') + ipaacalib_group = self.add_argument_group('IPAACA library arguments') ipaacalib_group.add_argument( - '--ipaaca-default-channel', action=self.IpaacaDefaultChannelAction, - default='default', metavar='NAME', dest='_ipaaca_default_channel_', - help="IPAACA channel name which is used if a buffer does not define one locally (default: 'default')") + '--ipaaca-payload-type', + action=self.IpaacaPayloadTypeAction, + choices=['JSON', 'STR'], # one of ipaaca.iu.IUPayloadTypes + dest='_ipaaca_payload_type_', + default='JSON', + help="specify payload type (default: 'JSON')") ipaacalib_group.add_argument( - '--ipaaca-logging-level', action=self.IpaacaLoggingLevelAction, + '--ipaaca-default-channel', + action=self.IpaacaDefaultChannelAction, + default='default', + metavar='NAME', + dest='_ipaaca_default_channel_', + help="specify default IPAACA channel name (default: 'default')") + ipaacalib_group.add_argument( + '--ipaaca-enable-logging', + action=self.IpaacaLoggingLevelAction, choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'], dest='_ipaaca_logging_level_', - help="IPAACA logging threshold (default: 'WARNING')") + help="enable IPAACA logging with threshold") + rsblib_group = self.add_argument_group('RSB library arguments') + rsblib_group.add_argument( + '--rsb-enable-logging', + action=self.IpaacaRSBLoggingLevelAction, + choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'], + dest='_ipaaca_rsb_enable_logging_', + help="enable RSB logging with threshold") def parse_args(self, args=None, namespace=None): - # Add ipaaca-library specific arguments at the very end - self._add_ipaaca_lib_arguments() + self._add_ipaaca_lib_arguments() # Add ipaaca-args just before parsing result = super(IpaacaArgumentParser, self).parse_args(args, namespace) + # Delete ipaaca specific arguments (beginning with '_ipaaca' and + # ending with an underscore) from the resulting Namespace object. for item in dir(result): if item.startswith('_ipaaca') and item.endswith('_'): delattr(result, item) return result - - diff --git a/ipaacatools/scripts/ipaaca-iu-injector b/ipaacatools/scripts/ipaaca-iu-injector index ea47a8bfe762d3fe22fd37605f9a6616ba417f87..1159400744ce399e5df3b79d5f2678a7281d0686 100755 --- a/ipaacatools/scripts/ipaaca-iu-injector +++ b/ipaacatools/scripts/ipaaca-iu-injector @@ -60,6 +60,11 @@ parser.add_argument( metavar='SECONDS', type=float, help='set time in seconds to wait for potential IU updates (default: 3.0)') +parser.add_argument( + '-j', '--json-payload', + dest='json_payload', + action='store_true', + help='allow structured data to be sent (treats payload VALUE arguments as Python expressions)') parser.add_argument( '-c', '--category', dest='category', @@ -81,22 +86,27 @@ if __name__ == '__main__': ob = ipaaca.OutputBuffer('IpaacaIUInjector') ob.register_handler(iu_update_handler) iu = ipaaca.Message(arguments.category) if arguments.iu_type == 'Message' else ipaaca.IU(arguments.category) - iu.payload = {k: v for (k, v) in itertools.izip_longest(arguments.payload[::2], arguments.payload[1::2])} + if arguments.json_payload: + # Treat payload values as Python expressions + iu.payload = {k: eval(v) for (k, v) in itertools.izip_longest(arguments.payload[::2], arguments.payload[1::2])} + else: + iu.payload = {k: v for (k, v) in itertools.izip_longest(arguments.payload[::2], arguments.payload[1::2])} ob.add(iu) + print( 'Sent {iu_type} with category "{category}" and payload {{'.format(**vars(arguments)), end='\n' if len(iu.payload) > 0 else '') for k, v in iu.payload.items(): - print(' "{key}": "{value}",'.format(key=k, value=v)) + print(" '{key}': {value},".format(key=k, value=v)) print('}.') + # Wait for updates to the IU try: if arguments.iu_type == 'IU': print('Waiting %s s for the IU to be updated.' % arguments.keep_alive) time.sleep(arguments.keep_alive) else: time.sleep(0.1) - except KeyboardInterrupt: pass diff --git a/ipaacatools/scripts/ipaaca-iu-sniffer b/ipaacatools/scripts/ipaaca-iu-sniffer index 1fbfc9d4bdc4a8799581080b2d39597ab350c924..3a98cee88183694874b56776dd8312458b5ce43b 100755 --- a/ipaacatools/scripts/ipaaca-iu-sniffer +++ b/ipaacatools/scripts/ipaaca-iu-sniffer @@ -34,12 +34,11 @@ from __future__ import division, print_function import logging +import os +import platform import re import sys import time -import argparse -import os -import platform import ipaaca @@ -65,8 +64,8 @@ def pretty_printed_dict(d): v = str(v) v2 = v if len(v) <= arguments.max_size else v[:arguments.max_size] + '<excess length omitted>' v2.replace('\\','\\\\').replace('\n', highlight_if_color('\\n')) - s += "\t '%s': '%s'," % (highlight_if_color(unicode(k),'1'), unicode(v2)) - s+='\n}' + s += "\t '%s': '%s',\n" % (highlight_if_color(unicode(k),'1'), unicode(v2)) + s+='}' return s def pretty_printed_iu_event(iu, event_type, local): @@ -141,9 +140,6 @@ if __name__ == '__main__': resend_active=True) buffers[channel].register_handler(my_update_handler) - print('') - print('Ipaaca IU Sniffer - run with --help to see options') - channellist = 's ' if len(arguments.channels) > 1 else ' ' channellist += ', '.join(arguments.channels)