Skip to content
Snippets Groups Projects
Commit 59a7cb65 authored by Hendrik Buschmeier's avatar Hendrik Buschmeier
Browse files

ipaaca-python: Restructured logging infrastructure.

parent 0568ed21
Branches
No related tags found
No related merge requests found
......@@ -35,17 +35,17 @@ from __future__ import division, print_function
import rsb
import rsb.converter
from ipaaca.misc import logger, IpaacaArgumentParser
import ipaaca_pb2
import ipaaca.converter
from ipaaca.buffer import InputBuffer, OutputBuffer
from ipaaca.exception import *
from ipaaca.iu import IU, Message, IUAccessMode, IUEventType
from ipaaca.misc import enable_logging, IpaacaArgumentParser
from ipaaca.payload import Payload
def initialize_ipaaca_rsb():
''''Register own RSB Converters and initialise RSB from default config file.'''
rsb.converter.registerGlobalConverter(
ipaaca.converter.IntConverter(
wireSchema="int32",
......@@ -85,8 +85,5 @@ def initialize_ipaaca_rsb():
rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()
## --- Module initialisation -------------------------------------------------
# register our own RSB Converters
# Initialise module
initialize_ipaaca_rsb()
......@@ -44,7 +44,6 @@ import ipaaca.exception
import ipaaca.converter
import ipaaca.iu
from ipaaca.misc import logger
__all__ = [
......@@ -52,6 +51,7 @@ __all__ = [
'OutputBuffer',
]
LOGGER = ipaaca.misc.get_library_logger()
class IUStore(dict):
"""A dictionary storing IUs."""
......@@ -222,7 +222,7 @@ class InputBuffer(Buffer):
cat_listener.addHandler(self._handle_iu_events)
self._listener_store[iu_category] = cat_listener
self._category_interests.append(iu_category)
logger.info("Added listener in scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+iu_category)
LOGGER.info("Added listener in scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+iu_category)
def _handle_iu_events(self, event):
'''Dispatch incoming IU events.
......@@ -259,7 +259,7 @@ class InputBuffer(Buffer):
# send resend request to remote server
self._request_remote_resend(event.data)
else:
logger.warning("Received an update for an IU which we did not receive before.")
LOGGER.warning("Received an update for an IU which we did not receive before.")
return
# an update to an existing IU
if type_ is ipaaca_pb2.IURetraction:
......@@ -294,7 +294,7 @@ class InputBuffer(Buffer):
iu._apply_link_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.LINKSUPDATED, category=iu.category)
else:
logger.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_))
LOGGER.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_))
def add_category_interests(self, category_interests):
if hasattr(category_interests, '__iter__'):
......@@ -363,13 +363,13 @@ class OutputBuffer(Buffer):
def _remote_update_links(self, update):
'''Apply a remotely requested update to one of the stored IU's links.'''
if update.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
LOGGER.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
return 0
iu = self._iu_store[update.uid]
with iu.revision_lock:
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
LOGGER.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
return 0
if update.is_delta:
iu.modify_links(add=update.new_links, remove=update.links_to_remove, writer_name=update.writer_name)
......@@ -381,16 +381,16 @@ class OutputBuffer(Buffer):
def _remote_update_payload(self, update):
'''Apply a remotely requested update to one of the stored IU's payload.'''
if update.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
LOGGER.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
return 0
iu = self._iu_store[update.uid]
with iu.revision_lock:
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning(u"Remote update_payload operation failed because request was out of date; IU "+str(update.uid))
logger.warning(u" Writer was: "+update.writer_name)
logger.warning(u" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove))
logger.warning(u" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision))
LOGGER.warning(u"Remote update_payload operation failed because request was out of date; IU "+str(update.uid))
LOGGER.warning(u" Writer was: "+update.writer_name)
LOGGER.warning(u" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove))
LOGGER.warning(u" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision))
return 0
if update.is_delta:
#print('Writing delta update by '+str(update.writer_name))
......@@ -409,7 +409,7 @@ class OutputBuffer(Buffer):
def _remote_request_resend(self, iu_resend_request_pack):
''' Resend a requested IU over the specific hidden category.'''
if iu_resend_request_pack.uid not in self._iu_store:
logger.warning("Remote side requested resending of non-existent IU "+str(iu_resend_request_pack.uid))
LOGGER.warning("Remote side requested resending of non-existent IU "+str(iu_resend_request_pack.uid))
return 0
iu = self._iu_store[iu_resend_request_pack.uid]
with iu.revision_lock:
......@@ -423,13 +423,13 @@ class OutputBuffer(Buffer):
def _remote_commit(self, iu_commission):
'''Apply a remotely requested commit to one of the stored IUs.'''
if iu_commission.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid))
LOGGER.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid))
return 0
iu = self._iu_store[iu_commission.uid]
with iu.revision_lock:
if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
LOGGER.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
return 0
if iu.committed:
return 0
......@@ -441,14 +441,14 @@ class OutputBuffer(Buffer):
def _get_informer(self, iu_category):
'''Return (or create, store and return) an informer object for IUs of the specified category.'''
if iu_category in self._informer_store:
logger.info("Returning informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category))
LOGGER.info("Returning informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category))
return self._informer_store[iu_category]
informer_iu = rsb.createInformer(
rsb.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)),
config=self._participant_config,
dataType=object)
self._informer_store[iu_category] = informer_iu #new_tuple
logger.info("Returning NEW informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category))
LOGGER.info("Returning NEW informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category))
return informer_iu #return new_tuple
def add(self, iu):
......
......@@ -38,13 +38,15 @@ import rsb.converter
import ipaaca_pb2
import ipaaca.iu
from ipaaca.misc import logger
import ipaaca.misc
LOGGER = ipaaca.misc.get_library_logger()
try:
import simplejson as json
except ImportError:
import json
logger.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.')
LOGGER.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.')
__all__ = [
......@@ -87,7 +89,7 @@ def unpack_payload_entry(entry):
elif entry.type == 'str':
return entry.value
else:
logger.warn('Received payload entry with unsupported type "' + entry.type + '".')
LOGGER.warn('Received payload entry with unsupported type "' + entry.type + '".')
return entry.value
......@@ -200,7 +202,7 @@ class IULinkUpdateConverter(rsb.converter.Converter):
if type == IULinkUpdate:
pbo = ipaaca_pb2.IULinkUpdate()
pbo.ParseFromString( str(byte_stream) )
logger.debug('received an IULinkUpdate for revision '+str(pbo.revision))
LOGGER.debug('received an IULinkUpdate for revision '+str(pbo.revision))
iu_link_up = IULinkUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_links:
iu_link_up.new_links[str(entry.type)] = set(entry.targets)
......@@ -253,7 +255,7 @@ class IUPayloadUpdateConverter(rsb.converter.Converter):
if type == IUPayloadUpdate:
pbo = ipaaca_pb2.IUPayloadUpdate()
pbo.ParseFromString( str(byte_stream) )
logger.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
LOGGER.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_items:
iu_up.new_items[entry.key] = unpack_payload_entry(entry)
......
......@@ -31,4 +31,5 @@
# Excellence Initiative.
IPAACA_DEFAULT_CHANNEL = 'default'
IPAACA_DEFAULT_LOGGING_LEVEL = 'WARNING'
\ No newline at end of file
IPAACA_LOGGER_NAME = 'ipaaca'
IPAACA_DEFAULT_LOGGING_LEVEL = 'WARNING'
......@@ -37,8 +37,6 @@ import copy
import threading
import uuid
from ipaaca.misc import logger
import ipaaca.converter
import ipaaca.exception
import ipaaca.misc
......@@ -52,6 +50,7 @@ __all__ = [
'Message',
]
LOGGER = ipaaca.misc.get_library_logger()
IUAccessMode = ipaaca.misc.enum(
PUSH = 'PUSH',
......@@ -328,18 +327,18 @@ class Message(IU):
def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
if self.is_published:
logger.info('Info: modifying a Message after sending has no global effects')
LOGGER.info('Info: modifying a Message after sending has no global effects')
def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
if self.is_published:
logger.info('Info: modifying a Message after sending has no global effects')
LOGGER.info('Info: modifying a Message after sending has no global effects')
def _increase_revision_number(self):
self._revision += 1
def _internal_commit(self, writer_name=None):
if self.is_published:
logger.info('Info: committing to a Message after sending has no global effects')
LOGGER.info('Info: committing to a Message after sending has no global effects')
def commit(self):
return self._internal_commit()
......@@ -348,7 +347,7 @@ class Message(IU):
return self._payload
def _set_payload(self, new_pl, writer_name=None):
if self.is_published:
logger.info('Info: modifying a Message after sending has no global effects')
LOGGER.info('Info: modifying a Message after sending has no global effects')
else:
if self.committed:
raise ipaaca.exception.IUCommittedError(self)
......@@ -409,18 +408,18 @@ class RemoteMessage(IUInterface):
self._links = links
def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
logger.info('Info: modifying a RemoteMessage only has local effects')
LOGGER.info('Info: modifying a RemoteMessage only has local effects')
def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
logger.info('Info: modifying a RemoteMessage only has local effects')
LOGGER.info('Info: modifying a RemoteMessage only has local effects')
def commit(self):
logger.info('Info: committing to a RemoteMessage only has local effects')
LOGGER.info('Info: committing to a RemoteMessage only has local effects')
def _get_payload(self):
return self._payload
def _set_payload(self, new_pl):
logger.info('Info: modifying a RemoteMessage only has local effects')
LOGGER.info('Info: modifying a RemoteMessage only has local effects')
self._payload = ipaaca.payload.Payload(iu=self, new_payload=new_pl, omit_init_update_message=True)
payload = property(
fget=_get_payload,
......@@ -429,7 +428,7 @@ class RemoteMessage(IUInterface):
def _apply_link_update(self, update):
"""Apply a IULinkUpdate to the IU."""
logger.warning('Warning: should never be called: RemoteMessage._apply_link_update')
LOGGER.warning('Warning: should never be called: RemoteMessage._apply_link_update')
self._revision = update.revision
if update.is_delta:
self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove)
......@@ -438,7 +437,7 @@ class RemoteMessage(IUInterface):
def _apply_update(self, update):
"""Apply a IUPayloadUpdate to the IU."""
logger.warning('Warning: should never be called: RemoteMessage._apply_update')
LOGGER.warning('Warning: should never be called: RemoteMessage._apply_update')
self._revision = update.revision
if update.is_delta:
for k in update.keys_to_remove: self.payload._remotely_enforced_delitem(k)
......@@ -449,12 +448,12 @@ class RemoteMessage(IUInterface):
def _apply_commission(self):
"""Apply commission to the IU"""
logger.warning('Warning: should never be called: RemoteMessage._apply_commission')
LOGGER.warning('Warning: should never be called: RemoteMessage._apply_commission')
self._committed = True
def _apply_retraction(self):
"""Apply retraction to the IU"""
logger.warning('Warning: should never be called: RemoteMessage._apply_retraction')
LOGGER.warning('Warning: should never be called: RemoteMessage._apply_retraction')
self._retracted = True
......
......@@ -41,7 +41,6 @@ import ipaaca.defaults
__all__ = [
'enum',
'logger',
'IpaacaArgumentParser',
]
......@@ -57,22 +56,47 @@ def enum(*sequential, **named):
return type('Enum', (), enums)
# Create a global logger for ipaaca
class IpaacaLoggingHandler(logging.Handler):
'''A logging handler that prints to stdout.'''
def __init__(self, level=logging.NOTSET):
def __init__(self, prefix='IPAACA', level=logging.NOTSET):
logging.Handler.__init__(self, level)
self._prefix = prefix
def emit(self, record):
meta = '[ipaaca] (%s) ' % str(record.levelname)
meta = '[%s: %s] ' % (self._prefix, str(record.levelname))
msg = str(record.msg.format(record.args))
print(meta + msg)
logger = logging.getLogger('ipaaca')
logger.addHandler(IpaacaLoggingHandler())
logger.setLevel(level=ipaaca.defaults.IPAACA_DEFAULT_LOGGING_LEVEL)
class GenericNoLoggingHandler(logging.Handler):
'''A logging handler that produces no output'''
def emit(self, record): pass
def get_library_logger():
'''Get ipaaca's library-wide logger object.'''
return logging.getLogger(ipaaca.defaults.IPAACA_LOGGER_NAME)
__IPAACA_LOGGING_HANDLER = IpaacaLoggingHandler('IPAACA')
__GENERIC_NO_LOG_HANDLER = GenericNoLoggingHandler()
# By default, suppress library logging
# - for IPAACA
get_library_logger().addHandler(__GENERIC_NO_LOG_HANDLER)
# - for RSB
logging.getLogger('rsb').addHandler(__GENERIC_NO_LOG_HANDLER)
def enable_logging(level=None):
'''Enable ipaaca's 'library-wide logging.'''
ipaaca_logger = get_library_logger()
ipaaca_logger.addHandler(__IPAACA_LOGGING_HANDLER)
ipaaca_logger.removeHandler(__GENERIC_NO_LOG_HANDLER)
ipaaca_logger.setLevel(level=level if level is not None else
ipaaca.defaults.IPAACA_DEFAULT_LOGGING_LEVEL)
class IpaacaArgumentParser(argparse.ArgumentParser):
......@@ -85,30 +109,56 @@ class IpaacaArgumentParser(argparse.ArgumentParser):
class IpaacaLoggingLevelAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
logger.setLevel(level=values)
def __init__(self, prog=None, usage=None, description=None, epilog=None, parents=[], formatter_class=argparse.HelpFormatter, prefix_chars='-', fromfile_prefix_chars=None, argument_default=None, conflict_handler='error', add_help=True):
super(IpaacaArgumentParser, self).__init__(prog=prog, usage=usage, description=description, epilog=epilog, parents=parents, formatter_class=formatter_class, prefix_chars=prefix_chars, fromfile_prefix_chars=fromfile_prefix_chars, argument_default=argument_default, conflict_handler=conflict_handler, add_help=add_help)
enable_logging(values)
class IpaacaRSBLoggingLevelAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
rsb_logger = logging.getLogger('rsb')
rsb_logger.addHandler(IpaacaLoggingHandler('RSB'))
rsb_logger.removeHandler(__GENERIC_NO_LOG_HANDLER)
rsb_logger.setLevel(level=values)
def __init__(self, prog=None, usage=None, description=None, epilog=None,
parents=[], formatter_class=argparse.HelpFormatter,
prefix_chars='-', fromfile_prefix_chars=None,
argument_default=None, conflict_handler='error', add_help=True):
super(IpaacaArgumentParser, self).__init__(prog=prog, usage=usage,
description=description, epilog=epilog, parents=parents,
formatter_class=formatter_class, prefix_chars=prefix_chars,
fromfile_prefix_chars=fromfile_prefix_chars,
argument_default=argument_default,
conflict_handler=conflict_handler, add_help=add_help)
def _add_ipaaca_lib_arguments(self):
ipaacalib_group = self.add_argument_group(title='IPAACA library arguments''')
ipaacalib_group = self.add_argument_group('IPAACA library arguments')
ipaacalib_group.add_argument(
'--ipaaca-default-channel', action=self.IpaacaDefaultChannelAction,
default='default', metavar='NAME', dest='_ipaaca_default_channel_',
help="IPAACA channel name which is used if a buffer does not define one locally (default: 'default')")
'--ipaaca-default-channel',
action=self.IpaacaDefaultChannelAction,
default='default',
metavar='NAME',
dest='_ipaaca_default_channel_',
help="specify default IPAACA channel name (default: 'default')")
ipaacalib_group.add_argument(
'--ipaaca-logging-level', action=self.IpaacaLoggingLevelAction,
'--ipaaca-enable-logging',
action=self.IpaacaLoggingLevelAction,
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'],
dest='_ipaaca_logging_level_',
help="IPAACA logging threshold (default: 'WARNING')")
help="enable IPAACA logging with threshold")
rsblib_group = self.add_argument_group('RSB library arguments')
rsblib_group.add_argument(
'--rsb-enable-logging',
action=self.IpaacaRSBLoggingLevelAction,
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'],
dest='_ipaaca_rsb_enable_logging_',
help="enable RSB logging with threshold")
def parse_args(self, args=None, namespace=None):
# Add ipaaca-library specific arguments at the very end
self._add_ipaaca_lib_arguments()
self._add_ipaaca_lib_arguments() # Add ipaaca-args just before parsing
result = super(IpaacaArgumentParser, self).parse_args(args, namespace)
# Delete ipaaca specific arguments (beginning with '_ipaaca' and
# ending with an underscore) from the resulting Namespace object.
for item in dir(result):
if item.startswith('_ipaaca') and item.endswith('_'):
delattr(result, item)
return result
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment