Skip to content
Snippets Groups Projects
Commit 6e73576b authored by Ramin Yaghoubzadeh Torky's avatar Ramin Yaghoubzadeh Torky
Browse files

Merging python changes since Jan 22

parents a67669be b2442f8b
No related branches found
No related tags found
No related merge requests found
Showing with 226 additions and 116 deletions
......@@ -231,7 +231,7 @@ public abstract class AbstractIU
List<PayloadItem> items = new ArrayList<PayloadItem>();
for (Entry<String, String> entry : newPayload.entrySet())
{
PayloadItem item = PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("str") // TODO:default type?
PayloadItem item = PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("STR")
.build();
items.add(item);
}
......
......@@ -79,7 +79,7 @@ public class IUConverter implements Converter<ByteBuffer>
List<PayloadItem> payloadItems = new ArrayList<PayloadItem>();
for (Entry<String, String> entry : iua.getPayload().entrySet())
{
payloadItems.add(PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("str").build());
payloadItems.add(PayloadItem.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).setType("STR").build());
}
List<LinkSet> links = new ArrayList<LinkSet>();
......@@ -95,7 +95,7 @@ public class IUConverter implements Converter<ByteBuffer>
}
IU iu = IU.newBuilder().setUid(iua.getUid()).setRevision(iua.getRevision()).setCategory(iua.getCategory())
.setOwnerName(iua.getOwnerName()).setCommitted(iua.isCommitted()).setAccessMode(accessMode)
.setReadOnly(iua.isReadOnly()).setPayloadType("MAP").addAllPayload(payloadItems).addAllLinks(links).build();
.setReadOnly(iua.isReadOnly()).setPayloadType("STR").addAllPayload(payloadItems).addAllLinks(links).build();
return new WireContents<ByteBuffer>(ByteBuffer.wrap(iu.toByteArray()), "ipaaca-iu");
}
......
......@@ -230,7 +230,7 @@ public class LocalIU extends AbstractIU
if (isPublished())
{
// send update to remote holders
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("str") // TODO: fix this, default in .proto?
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR")
.build();
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setUid(getUid()).setRevision(getRevision()).setIsDelta(true)
.setWriterName(writer == null ? getOwnerName() : writer).addNewItems(newItem).build();
......@@ -256,7 +256,7 @@ public class LocalIU extends AbstractIU
.setWriterName(writer == null ? getOwnerName() : writer);
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{
PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("str") // TODO: fix this, default in .proto?
PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("STR")
.build();
builder.addNewItems(newItem);
......
......@@ -95,8 +95,7 @@ public class Payload implements Map<String, String>
}
public String pseudoConvertFromJSON(String value, String type) {
if (type.equals("json")) {
System.out.println("Received JSON IU");
if (type.equals("JSON")) {
if (value.startsWith("\"")) {
return value.replaceAll("\\\"", "");
} else if (value.startsWith("{") || value.startsWith("[") || value.matches("true") || value.matches("false") || value.matches("-?[0-9]*[.,]?[0-9][0-9]*.*")) {
......
......@@ -99,7 +99,7 @@ public class RemotePushIU extends AbstractIU
{
throw new IUReadOnlyException(this);
}
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("str").build();// TODO use default type in .proto
PayloadItem newItem = PayloadItem.newBuilder().setKey(key).setValue(value).setType("STR").build();
IUPayloadUpdate update = IUPayloadUpdate.newBuilder().setIsDelta(true).setUid(getUid()).setRevision(getRevision())
.setWriterName(getBuffer().getUniqueName()).addNewItems(newItem).build();
......@@ -144,7 +144,7 @@ public class RemotePushIU extends AbstractIU
.setWriterName(getBuffer().getUniqueName());
for (Map.Entry<? extends String, ? extends String> item : newItems.entrySet())
{
PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("str") // TODO: fix this, default in .proto?
PayloadItem newItem = PayloadItem.newBuilder().setKey(item.getKey()).setValue(item.getValue()).setType("STR")
.build();
builder.addNewItems(newItem);
......@@ -301,9 +301,9 @@ public class RemotePushIU extends AbstractIU
payload.enforcedRemoveItem(key);
}
for (PayloadItem item : update.getNewItemsList()) {
if (item.getType().equals("str")) {
if (item.getType().equals("STR")) {
payload.enforcedSetItem(item.getKey(), item.getValue());
} else if (item.getType().equals("json")) {
} else if (item.getType().equals("JSON")) {
String value = item.getValue();
if (value.startsWith("\"")) {
payload.enforcedSetItem(item.getKey(), value.replaceAll("\\\"", ""));
......
......@@ -35,17 +35,17 @@ from __future__ import division, print_function
import rsb
import rsb.converter
from ipaaca.misc import logger, IpaacaArgumentParser
import ipaaca_pb2
import ipaaca.converter
from ipaaca.buffer import InputBuffer, OutputBuffer
from ipaaca.exception import *
from ipaaca.iu import IU, Message, IUAccessMode, IUEventType
from ipaaca.misc import enable_logging, IpaacaArgumentParser
from ipaaca.payload import Payload
def initialize_ipaaca_rsb():
''''Register own RSB Converters and initialise RSB from default config file.'''
rsb.converter.registerGlobalConverter(
ipaaca.converter.IntConverter(
wireSchema="int32",
......@@ -85,8 +85,5 @@ def initialize_ipaaca_rsb():
rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()
## --- Module initialisation -------------------------------------------------
# register our own RSB Converters
# Initialise module
initialize_ipaaca_rsb()
......@@ -44,7 +44,6 @@ import ipaaca.exception
import ipaaca.converter
import ipaaca.iu
from ipaaca.misc import logger
__all__ = [
......@@ -52,6 +51,7 @@ __all__ = [
'OutputBuffer',
]
LOGGER = ipaaca.misc.get_library_logger()
class IUStore(dict):
"""A dictionary storing IUs."""
......@@ -155,6 +155,8 @@ class Buffer(object):
for_categories -- a list of category names or None if handler should
be called for all categories
"""
if handler_function in [h._handler_function for h in self._iu_event_handlers]:
LOGGER.warn("The handler function '" + handler_function.__name__ + '" has been registered before.')
handler = IUEventHandler(handler_function=handler_function, for_event_types=for_event_types, for_categories=for_categories)
self._iu_event_handlers.append(handler)
return handler
......@@ -196,9 +198,7 @@ class InputBuffer(Buffer):
# add own uuid as identifier for hidden category.
self._add_category_listener(str(self._uuid))
if category_interests is not None:
for cat in category_interests:
self._add_category_listener(cat)
self.add_category_interests(category_interests)
def _get_remote_server(self, iu):
'''Return (or create, store and return) a remote server.'''
......@@ -216,13 +216,20 @@ class InputBuffer(Buffer):
return remote_server
def _add_category_listener(self, iu_category):
'''Return (or create, store and return) a category listener on a specific channel.'''
'''Create and store a listener on a specific category.'''
if iu_category not in self._listener_store:
cat_listener = rsb.createListener(rsb.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)), config=self._participant_config)
cat_listener.addHandler(self._handle_iu_events)
self._listener_store[iu_category] = cat_listener
self._category_interests.append(iu_category)
logger.info("Added listener in scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+iu_category)
LOGGER.info("Added listener in scope /ipaaca/channel/" + str(self._channel) + "/category/" + iu_category)
def _remove_category_listener(self, iu_category):
'''Remove the listener for a specific category.'''
if iu_category in self._listener_store and iu_category in self._category_interests:
del self._listener_store[iu_category]
self._category_interests.remove(iu_category)
LOGGER.info("Removed listener in scope /ipaaca/channel/" + str(self._channel) + "/category/ " + iu_category)
def _handle_iu_events(self, event):
'''Dispatch incoming IU events.
......@@ -259,7 +266,7 @@ class InputBuffer(Buffer):
# send resend request to remote server
self._request_remote_resend(event.data)
else:
logger.warning("Received an update for an IU which we did not receive before.")
LOGGER.warning("Received an update for an IU which we did not receive before.")
return
# an update to an existing IU
if type_ is ipaaca_pb2.IURetraction:
......@@ -294,7 +301,7 @@ class InputBuffer(Buffer):
iu._apply_link_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=ipaaca.iu.IUEventType.LINKSUPDATED, category=iu.category)
else:
logger.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_))
LOGGER.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_))
def add_category_interests(self, category_interests):
if hasattr(category_interests, '__iter__'):
......@@ -303,6 +310,13 @@ class InputBuffer(Buffer):
else:
self._add_category_listener(category_interests)
def remove_category_interests(self, category_interests):
if hasattr(category_interests, '__iter__'):
for interest in category_interests:
self._remove_category_listener(interest)
else:
self._remove_category_listener(category_interests)
def _request_remote_resend(self, iu):
remote_server = self._get_remote_server(iu)
resend_request = ipaaca_pb2.IUResendRequest()
......@@ -363,13 +377,13 @@ class OutputBuffer(Buffer):
def _remote_update_links(self, update):
'''Apply a remotely requested update to one of the stored IU's links.'''
if update.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
LOGGER.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
return 0
iu = self._iu_store[update.uid]
with iu.revision_lock:
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
LOGGER.warning("Remote write operation failed because request was out of date; IU "+str(update.uid))
return 0
if update.is_delta:
iu.modify_links(add=update.new_links, remove=update.links_to_remove, writer_name=update.writer_name)
......@@ -381,16 +395,16 @@ class OutputBuffer(Buffer):
def _remote_update_payload(self, update):
'''Apply a remotely requested update to one of the stored IU's payload.'''
if update.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
LOGGER.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(update.uid))
return 0
iu = self._iu_store[update.uid]
with iu.revision_lock:
if (update.revision != 0) and (update.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning(u"Remote update_payload operation failed because request was out of date; IU "+str(update.uid))
logger.warning(u" Writer was: "+update.writer_name)
logger.warning(u" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove))
logger.warning(u" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision))
LOGGER.warning(u"Remote update_payload operation failed because request was out of date; IU "+str(update.uid))
LOGGER.warning(u" Writer was: "+update.writer_name)
LOGGER.warning(u" Requested update was: (New keys:) "+','.join(update.new_items.keys())+' (Removed keys:) '+','.join(update.keys_to_remove))
LOGGER.warning(u" Referred-to revision was "+str(update.revision)+' while local revision is '+str(iu.revision))
return 0
if update.is_delta:
#print('Writing delta update by '+str(update.writer_name))
......@@ -409,7 +423,7 @@ class OutputBuffer(Buffer):
def _remote_request_resend(self, iu_resend_request_pack):
''' Resend a requested IU over the specific hidden category.'''
if iu_resend_request_pack.uid not in self._iu_store:
logger.warning("Remote side requested resending of non-existent IU "+str(iu_resend_request_pack.uid))
LOGGER.warning("Remote side requested resending of non-existent IU "+str(iu_resend_request_pack.uid))
return 0
iu = self._iu_store[iu_resend_request_pack.uid]
with iu.revision_lock:
......@@ -423,13 +437,13 @@ class OutputBuffer(Buffer):
def _remote_commit(self, iu_commission):
'''Apply a remotely requested commit to one of the stored IUs.'''
if iu_commission.uid not in self._iu_store:
logger.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid))
LOGGER.warning("Remote InBuffer tried to spuriously write non-existent IU "+str(iu_commission.uid))
return 0
iu = self._iu_store[iu_commission.uid]
with iu.revision_lock:
if (iu_commission.revision != 0) and (iu_commission.revision != iu.revision):
# (0 means "do not pay attention to the revision number" -> "force update")
logger.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
LOGGER.warning("Remote write operation failed because request was out of date; IU "+str(iu_commission.uid))
return 0
if iu.committed:
return 0
......@@ -441,14 +455,14 @@ class OutputBuffer(Buffer):
def _get_informer(self, iu_category):
'''Return (or create, store and return) an informer object for IUs of the specified category.'''
if iu_category in self._informer_store:
logger.info("Returning informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category))
LOGGER.info("Returning informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category))
return self._informer_store[iu_category]
informer_iu = rsb.createInformer(
rsb.Scope("/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category)),
config=self._participant_config,
dataType=object)
self._informer_store[iu_category] = informer_iu #new_tuple
logger.info("Returning NEW informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category))
LOGGER.info("Returning NEW informer on scope "+"/ipaaca/channel/"+str(self._channel)+"/category/"+str(iu_category))
return informer_iu #return new_tuple
def add(self, iu):
......@@ -553,7 +567,11 @@ class OutputBuffer(Buffer):
new_items = {}
if keys_to_remove is None:
keys_to_remove = []
payload_update = ipaaca.converter.IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision)
payload_update = ipaaca.converter.IUPayloadUpdate(
uid=iu._uid,
revision=revision,
is_delta=is_delta,
payload_type=iu.payload_type)
payload_update.new_items = new_items
if is_delta:
payload_update.keys_to_remove = keys_to_remove
......
......@@ -37,14 +37,18 @@ import collections
import rsb.converter
import ipaaca_pb2
import ipaaca.defaults
import ipaaca.exception
import ipaaca.iu
from ipaaca.misc import logger
import ipaaca.misc
LOGGER = ipaaca.misc.get_library_logger()
try:
import simplejson as json
except ImportError:
import json
logger.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.')
LOGGER.warn('INFO: Using module "json" instead of "simplejson". Install "simplejson" for better performance.')
__all__ = [
......@@ -74,20 +78,25 @@ class IntConverter(rsb.converter.Converter):
return pbo.value
def pack_payload_entry(entry, key, value):
def pack_payload_entry(entry, key, value, _type=ipaaca.iu.IUPayloadType.JSON):
entry.key = key
entry.value = json.dumps(value)
entry.type = 'json'
if _type == ipaaca.iu.IUPayloadType.JSON:
entry.value = json.dumps(value)
elif _type == ipaaca.iu.IUPayloadType.STR or _type == 'MAP':
entry.value = str(value)
else:
raise ipaaca.exception.IpaacaException('Asked to send payload entry with unsupported type "' + _type + '".')
entry.type = _type
def unpack_payload_entry(entry):
# We assume that the only transfer types are 'str' or 'json'. Both are transparently handled by json.loads
if entry.type == 'json':
# We assume that the only transfer types are 'STR' or 'JSON'. Both are transparently handled by json.loads
if entry.type == ipaaca.iu.IUPayloadType.JSON:
return json.loads(entry.value)
elif entry.type == 'str':
elif entry.type == ipaaca.iu.IUPayloadType.STR or entry.type == 'str':
return entry.value
else:
logger.warn('Received payload entry with unsupported type "' + entry.type + '".')
LOGGER.warn('Received payload entry with unsupported type "' + entry.type + '".')
return entry.value
......@@ -113,7 +122,7 @@ class IUConverter(rsb.converter.Converter):
pbo.read_only = iu._read_only
for k, v in iu._payload.iteritems():
entry = pbo.payload.add()
pack_payload_entry(entry, k, v)
pack_payload_entry(entry, k, v, iu.payload_type)
for type_ in iu._links.keys():
linkset = pbo.links.add()
linkset.type = type_
......@@ -136,7 +145,7 @@ class IUConverter(rsb.converter.Converter):
read_only = pbo.read_only,
owner_name = pbo.owner_name,
category = pbo.category,
payload_type = pbo.payload_type,
payload_type = 'str' if pbo.payload_type is 'MAP' else pbo.payload_type,
committed = pbo.committed,
payload=_payload,
links=_links)
......@@ -200,7 +209,7 @@ class IULinkUpdateConverter(rsb.converter.Converter):
if type == IULinkUpdate:
pbo = ipaaca_pb2.IULinkUpdate()
pbo.ParseFromString( str(byte_stream) )
logger.debug('received an IULinkUpdate for revision '+str(pbo.revision))
LOGGER.debug('received an IULinkUpdate for revision '+str(pbo.revision))
iu_link_up = IULinkUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_links:
iu_link_up.new_links[str(entry.type)] = set(entry.targets)
......@@ -213,10 +222,11 @@ class IULinkUpdateConverter(rsb.converter.Converter):
class IUPayloadUpdate(object):
def __init__(self, uid, revision, is_delta, writer_name="undef", new_items=None, keys_to_remove=None):
def __init__(self, uid, revision, is_delta, payload_type, writer_name="undef", new_items=None, keys_to_remove=None):
super(IUPayloadUpdate, self).__init__()
self.uid = uid
self.revision = revision
self.payload_type = payload_type
self.writer_name = writer_name
self.is_delta = is_delta
self.new_items = {} if new_items is None else new_items
......@@ -226,6 +236,7 @@ class IUPayloadUpdate(object):
s = 'PayloadUpdate(' + 'uid=' + self.uid + ', '
s += 'revision='+str(self.revision)+', '
s += 'writer_name='+str(self.writer_name)+', '
s += 'payload_type='+str(self.payload_type)+', '
s += 'is_delta='+str(self.is_delta)+', '
s += 'new_items = '+str(self.new_items)+', '
s += 'keys_to_remove = '+str(self.keys_to_remove)+')'
......@@ -241,9 +252,9 @@ class IUPayloadUpdateConverter(rsb.converter.Converter):
pbo.uid = iu_payload_update.uid
pbo.writer_name = iu_payload_update.writer_name
pbo.revision = iu_payload_update.revision
for k,v in iu_payload_update.new_items.items():
for k, v in iu_payload_update.new_items.items():
entry = pbo.new_items.add()
pack_payload_entry(entry, k, v)
pack_payload_entry(entry, k, v, iu_payload_update.payload_type)
pbo.keys_to_remove.extend(iu_payload_update.keys_to_remove)
pbo.is_delta = iu_payload_update.is_delta
return bytearray(pbo.SerializeToString()), self.wireSchema
......@@ -253,8 +264,8 @@ class IUPayloadUpdateConverter(rsb.converter.Converter):
if type == IUPayloadUpdate:
pbo = ipaaca_pb2.IUPayloadUpdate()
pbo.ParseFromString( str(byte_stream) )
logger.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
LOGGER.debug('received an IUPayloadUpdate for revision '+str(pbo.revision))
iu_up = IUPayloadUpdate( uid=pbo.uid, revision=pbo.revision, payload_type=None, writer_name=pbo.writer_name, is_delta=pbo.is_delta)
for entry in pbo.new_items:
iu_up.new_items[entry.key] = unpack_payload_entry(entry)
iu_up.keys_to_remove = pbo.keys_to_remove[:]
......
......@@ -31,4 +31,8 @@
# Excellence Initiative.
IPAACA_DEFAULT_CHANNEL = 'default'
IPAACA_DEFAULT_LOGGING_LEVEL = 'WARNING'
\ No newline at end of file
IPAACA_LOGGER_NAME = 'ipaaca'
IPAACA_DEFAULT_LOGGING_LEVEL = 'WARNING'
IPAACA_DEFAULT_IU_PAYLOAD_TYPE = 'JSON' # one of ipaaca.iu.IUPayloadType
......@@ -37,8 +37,6 @@ import copy
import threading
import uuid
from ipaaca.misc import logger
import ipaaca.converter
import ipaaca.exception
import ipaaca.misc
......@@ -48,10 +46,12 @@ import ipaaca.payload
__all__ = [
'IUAccessMode',
'IUEventType',
'IUPayloadType',
'IU',
'Message',
]
LOGGER = ipaaca.misc.get_library_logger()
IUAccessMode = ipaaca.misc.enum(
PUSH = 'PUSH',
......@@ -71,11 +71,17 @@ IUEventType = ipaaca.misc.enum(
)
IUPayloadType = ipaaca.misc.enum(
JSON = 'JSON',
STR = 'STR'
)
class IUInterface(object):
"""Base class of all specialised IU classes."""
def __init__(self, uid, access_mode=IUAccessMode.PUSH, read_only=False):
def __init__(self, uid, access_mode=IUAccessMode.PUSH, read_only=False, payload_type=None):
"""Creates an IU.
Keyword arguments:
......@@ -86,12 +92,12 @@ class IUInterface(object):
self._uid = uid
self._revision = None
self._category = None
self._payload_type = None
self._owner_name = None
self._committed = False
self._retracted = False
self._access_mode = access_mode
self._read_only = read_only
self._payload_type = payload_type if payload_type is not None else ipaaca.defaults.IPAACA_DEFAULT_IU_PAYLOAD_TYPE
self._buffer = None
# payload is not present here
self._links = collections.defaultdict(set)
......@@ -162,7 +168,15 @@ class IUInterface(object):
def _get_payload_type(self):
return self._payload_type
payload_type = property(fget=_get_payload_type, doc='Type of the IU payload')
def _set_payload_type(self, type):
if self._buffer is None:
self._payload_type = type
else:
raise IpaacaException('The IU is already in a buffer, cannot change payload type anymore.')
payload_type = property(
fget=_get_payload_type,
fset=_set_payload_type,
doc='Type of the IU payload')
def _get_committed(self):
return self._committed
......@@ -194,7 +208,7 @@ class IUInterface(object):
return self._buffer
def _set_buffer(self, buffer):
if self._buffer is not None:
raise Exception('The IU is already in a buffer, cannot move it.')
raise IpaacaException('The IU is already in a buffer, cannot move it.')
self._buffer = buffer
buffer = property(
fget=_get_buffer,
......@@ -217,12 +231,11 @@ class IU(IUInterface):
"""A local IU."""
def __init__(self, category='undef', access_mode=IUAccessMode.PUSH, read_only=False, _payload_type='MAP'):
super(IU, self).__init__(uid=None, access_mode=access_mode, read_only=read_only)
def __init__(self, category='undef', access_mode=IUAccessMode.PUSH, read_only=False, payload_type=None):
super(IU, self).__init__(uid=None, access_mode=access_mode, read_only=read_only, payload_type=payload_type)
self._revision = 1
self.uid = str(uuid.uuid4())
self._category = str(category)
self._payload_type = _payload_type
self.revision_lock = threading.RLock()
self._payload = ipaaca.payload.Payload(iu=self)
......@@ -323,23 +336,23 @@ class IU(IUInterface):
class Message(IU):
"""Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls."""
def __init__(self, category='undef', access_mode=IUAccessMode.MESSAGE, read_only=True, _payload_type='MAP'):
super(Message, self).__init__(category=str(category), access_mode=access_mode, read_only=read_only, _payload_type=_payload_type)
def __init__(self, category='undef', access_mode=IUAccessMode.MESSAGE, read_only=True, payload_type=None):
super(Message, self).__init__(category=str(category), access_mode=access_mode, read_only=read_only, payload_type=payload_type)
def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
if self.is_published:
logger.info('Info: modifying a Message after sending has no global effects')
LOGGER.info('Info: modifying a Message after sending has no global effects')
def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
if self.is_published:
logger.info('Info: modifying a Message after sending has no global effects')
LOGGER.info('Info: modifying a Message after sending has no global effects')
def _increase_revision_number(self):
self._revision += 1
def _internal_commit(self, writer_name=None):
if self.is_published:
logger.info('Info: committing to a Message after sending has no global effects')
LOGGER.info('Info: committing to a Message after sending has no global effects')
def commit(self):
return self._internal_commit()
......@@ -348,7 +361,7 @@ class Message(IU):
return self._payload
def _set_payload(self, new_pl, writer_name=None):
if self.is_published:
logger.info('Info: modifying a Message after sending has no global effects')
LOGGER.info('Info: modifying a Message after sending has no global effects')
else:
if self.committed:
raise ipaaca.exception.IUCommittedError(self)
......@@ -395,11 +408,10 @@ class RemoteMessage(IUInterface):
"""A remote IU with access mode 'MESSAGE'."""
def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links):
super(RemoteMessage, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only)
super(RemoteMessage, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only, payload_type=payload_type)
self._revision = revision
self._category = category
self.owner_name = owner_name
self._payload_type = payload_type
self._committed = committed
self._retracted = False
# NOTE Since the payload is an already-existant Payload which we didn't modify ourselves,
......@@ -409,18 +421,18 @@ class RemoteMessage(IUInterface):
self._links = links
def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None):
logger.info('Info: modifying a RemoteMessage only has local effects')
LOGGER.info('Info: modifying a RemoteMessage only has local effects')
def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None):
logger.info('Info: modifying a RemoteMessage only has local effects')
LOGGER.info('Info: modifying a RemoteMessage only has local effects')
def commit(self):
logger.info('Info: committing to a RemoteMessage only has local effects')
LOGGER.info('Info: committing to a RemoteMessage only has local effects')
def _get_payload(self):
return self._payload
def _set_payload(self, new_pl):
logger.info('Info: modifying a RemoteMessage only has local effects')
LOGGER.info('Info: modifying a RemoteMessage only has local effects')
self._payload = ipaaca.payload.Payload(iu=self, new_payload=new_pl, omit_init_update_message=True)
payload = property(
fget=_get_payload,
......@@ -429,7 +441,7 @@ class RemoteMessage(IUInterface):
def _apply_link_update(self, update):
"""Apply a IULinkUpdate to the IU."""
logger.warning('Warning: should never be called: RemoteMessage._apply_link_update')
LOGGER.warning('Warning: should never be called: RemoteMessage._apply_link_update')
self._revision = update.revision
if update.is_delta:
self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove)
......@@ -438,7 +450,7 @@ class RemoteMessage(IUInterface):
def _apply_update(self, update):
"""Apply a IUPayloadUpdate to the IU."""
logger.warning('Warning: should never be called: RemoteMessage._apply_update')
LOGGER.warning('Warning: should never be called: RemoteMessage._apply_update')
self._revision = update.revision
if update.is_delta:
for k in update.keys_to_remove: self.payload._remotely_enforced_delitem(k)
......@@ -449,12 +461,12 @@ class RemoteMessage(IUInterface):
def _apply_commission(self):
"""Apply commission to the IU"""
logger.warning('Warning: should never be called: RemoteMessage._apply_commission')
LOGGER.warning('Warning: should never be called: RemoteMessage._apply_commission')
self._committed = True
def _apply_retraction(self):
"""Apply retraction to the IU"""
logger.warning('Warning: should never be called: RemoteMessage._apply_retraction')
LOGGER.warning('Warning: should never be called: RemoteMessage._apply_retraction')
self._retracted = True
......@@ -463,11 +475,10 @@ class RemotePushIU(IUInterface):
"""A remote IU with access mode 'PUSH'."""
def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links):
super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only)
super(RemotePushIU, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only, payload_type=payload_type)
self._revision = revision
self._category = category
self.owner_name = owner_name
self._payload_type = payload_type
self._committed = committed
self._retracted = False
# NOTE Since the payload is an already-existant Payload which we didn't modify ourselves,
......@@ -505,6 +516,7 @@ class RemotePushIU(IUInterface):
requested_update = ipaaca.converter.IUPayloadUpdate(
uid=self.uid,
revision=self.revision,
payload_type=self.payload_type,
is_delta=is_delta,
writer_name=self.buffer.unique_name,
new_items=new_items,
......@@ -546,6 +558,7 @@ class RemotePushIU(IUInterface):
requested_update = ipaaca.converter.IUPayloadUpdate(
uid=self.uid,
revision=self.revision,
payload_type=self.payload_type,
is_delta=False,
writer_name=self.buffer.unique_name,
new_items=new_pl,
......
......@@ -41,7 +41,6 @@ import ipaaca.defaults
__all__ = [
'enum',
'logger',
'IpaacaArgumentParser',
]
......@@ -54,26 +53,51 @@ def enum(*sequential, **named):
whats-the-best-way-to-implement-an-enum-in-python/1695250#1695250
"""
enums = dict(zip(sequential, range(len(sequential))), **named)
return type('Enum', (), enums)
enums['_choices'] = enums.keys()
return type('Enum', (object,), enums)
# Create a global logger for ipaaca
class IpaacaLoggingHandler(logging.Handler):
'''A logging handler that prints to stdout.'''
def __init__(self, level=logging.NOTSET):
def __init__(self, prefix='IPAACA', level=logging.NOTSET):
logging.Handler.__init__(self, level)
self._prefix = prefix
def emit(self, record):
meta = '[ipaaca] (%s) ' % str(record.levelname)
meta = '[%s: %s] ' % (self._prefix, str(record.levelname))
msg = str(record.msg.format(record.args))
print(meta + msg)
logger = logging.getLogger('ipaaca')
logger.addHandler(IpaacaLoggingHandler())
logger.setLevel(level=ipaaca.defaults.IPAACA_DEFAULT_LOGGING_LEVEL)
class GenericNoLoggingHandler(logging.Handler):
'''A logging handler that produces no output'''
def emit(self, record): pass
def get_library_logger():
'''Get ipaaca's library-wide logger object.'''
return logging.getLogger(ipaaca.defaults.IPAACA_LOGGER_NAME)
__IPAACA_LOGGING_HANDLER = IpaacaLoggingHandler('IPAACA')
__GENERIC_NO_LOG_HANDLER = GenericNoLoggingHandler()
# By default, suppress library logging
# - for IPAACA
get_library_logger().addHandler(__GENERIC_NO_LOG_HANDLER)
# - for RSB
logging.getLogger('rsb').addHandler(__GENERIC_NO_LOG_HANDLER)
def enable_logging(level=None):
'''Enable ipaaca's 'library-wide logging.'''
ipaaca_logger = get_library_logger()
ipaaca_logger.addHandler(__IPAACA_LOGGING_HANDLER)
ipaaca_logger.removeHandler(__GENERIC_NO_LOG_HANDLER)
ipaaca_logger.setLevel(level=level if level is not None else
ipaaca.defaults.IPAACA_DEFAULT_LOGGING_LEVEL)
class IpaacaArgumentParser(argparse.ArgumentParser):
......@@ -82,33 +106,71 @@ class IpaacaArgumentParser(argparse.ArgumentParser):
def __call__(self, parser, namespace, values, option_string=None):
ipaaca.defaults.IPAACA_DEFAULT_CHANNEL = values
class IpaacaPayloadTypeAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
ipaaca.defaults.IPAACA_DEFAULT_IU_PAYLOAD_TYPE = values
class IpaacaLoggingLevelAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
logger.setLevel(level=values)
def __init__(self, prog=None, usage=None, description=None, epilog=None, parents=[], formatter_class=argparse.HelpFormatter, prefix_chars='-', fromfile_prefix_chars=None, argument_default=None, conflict_handler='error', add_help=True):
super(IpaacaArgumentParser, self).__init__(prog=prog, usage=usage, description=description, epilog=epilog, parents=parents, formatter_class=formatter_class, prefix_chars=prefix_chars, fromfile_prefix_chars=fromfile_prefix_chars, argument_default=argument_default, conflict_handler=conflict_handler, add_help=add_help)
enable_logging(values)
class IpaacaRSBLoggingLevelAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
rsb_logger = logging.getLogger('rsb')
rsb_logger.addHandler(IpaacaLoggingHandler('RSB'))
rsb_logger.removeHandler(__GENERIC_NO_LOG_HANDLER)
rsb_logger.setLevel(level=values)
def __init__(self, prog=None, usage=None, description=None, epilog=None,
parents=[], formatter_class=argparse.HelpFormatter,
prefix_chars='-', fromfile_prefix_chars=None,
argument_default=None, conflict_handler='error', add_help=True):
super(IpaacaArgumentParser, self).__init__(prog=prog, usage=usage,
description=description, epilog=epilog, parents=parents,
formatter_class=formatter_class, prefix_chars=prefix_chars,
fromfile_prefix_chars=fromfile_prefix_chars,
argument_default=argument_default,
conflict_handler=conflict_handler, add_help=add_help)
def _add_ipaaca_lib_arguments(self):
ipaacalib_group = self.add_argument_group(title='IPAACA library arguments''')
ipaacalib_group = self.add_argument_group('IPAACA library arguments')
ipaacalib_group.add_argument(
'--ipaaca-default-channel', action=self.IpaacaDefaultChannelAction,
default='default', metavar='NAME', dest='_ipaaca_default_channel_',
help="IPAACA channel name which is used if a buffer does not define one locally (default: 'default')")
'--ipaaca-payload-type',
action=self.IpaacaPayloadTypeAction,
choices=['JSON', 'STR'], # one of ipaaca.iu.IUPayloadTypes
dest='_ipaaca_payload_type_',
default='JSON',
help="specify payload type (default: 'JSON')")
ipaacalib_group.add_argument(
'--ipaaca-logging-level', action=self.IpaacaLoggingLevelAction,
'--ipaaca-default-channel',
action=self.IpaacaDefaultChannelAction,
default='default',
metavar='NAME',
dest='_ipaaca_default_channel_',
help="specify default IPAACA channel name (default: 'default')")
ipaacalib_group.add_argument(
'--ipaaca-enable-logging',
action=self.IpaacaLoggingLevelAction,
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'],
dest='_ipaaca_logging_level_',
help="IPAACA logging threshold (default: 'WARNING')")
help="enable IPAACA logging with threshold")
rsblib_group = self.add_argument_group('RSB library arguments')
rsblib_group.add_argument(
'--rsb-enable-logging',
action=self.IpaacaRSBLoggingLevelAction,
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'],
dest='_ipaaca_rsb_enable_logging_',
help="enable RSB logging with threshold")
def parse_args(self, args=None, namespace=None):
# Add ipaaca-library specific arguments at the very end
self._add_ipaaca_lib_arguments()
self._add_ipaaca_lib_arguments() # Add ipaaca-args just before parsing
result = super(IpaacaArgumentParser, self).parse_args(args, namespace)
# Delete ipaaca specific arguments (beginning with '_ipaaca' and
# ending with an underscore) from the resulting Namespace object.
for item in dir(result):
if item.startswith('_ipaaca') and item.endswith('_'):
delattr(result, item)
return result
......@@ -60,6 +60,11 @@ parser.add_argument(
metavar='SECONDS',
type=float,
help='set time in seconds to wait for potential IU updates (default: 3.0)')
parser.add_argument(
'-j', '--json-payload',
dest='json_payload',
action='store_true',
help='allow structured data to be sent (treats payload VALUE arguments as Python expressions)')
parser.add_argument(
'-c', '--category',
dest='category',
......@@ -81,22 +86,27 @@ if __name__ == '__main__':
ob = ipaaca.OutputBuffer('IpaacaIUInjector')
ob.register_handler(iu_update_handler)
iu = ipaaca.Message(arguments.category) if arguments.iu_type == 'Message' else ipaaca.IU(arguments.category)
iu.payload = {k: v for (k, v) in itertools.izip_longest(arguments.payload[::2], arguments.payload[1::2])}
if arguments.json_payload:
# Treat payload values as Python expressions
iu.payload = {k: eval(v) for (k, v) in itertools.izip_longest(arguments.payload[::2], arguments.payload[1::2])}
else:
iu.payload = {k: v for (k, v) in itertools.izip_longest(arguments.payload[::2], arguments.payload[1::2])}
ob.add(iu)
print(
'Sent {iu_type} with category "{category}" and payload {{'.format(**vars(arguments)),
end='\n' if len(iu.payload) > 0 else '')
for k, v in iu.payload.items():
print(' "{key}": "{value}",'.format(key=k, value=v))
print(" '{key}': {value},".format(key=k, value=v))
print('}.')
# Wait for updates to the IU
try:
if arguments.iu_type == 'IU':
print('Waiting %s s for the IU to be updated.' % arguments.keep_alive)
time.sleep(arguments.keep_alive)
else:
time.sleep(0.1)
except KeyboardInterrupt:
pass
......
......@@ -34,12 +34,11 @@
from __future__ import division, print_function
import logging
import os
import platform
import re
import sys
import time
import argparse
import os
import platform
import ipaaca
......@@ -65,8 +64,8 @@ def pretty_printed_dict(d):
v = str(v)
v2 = v if len(v) <= arguments.max_size else v[:arguments.max_size] + '<excess length omitted>'
v2.replace('\\','\\\\').replace('\n', highlight_if_color('\\n'))
s += "\t '%s': '%s'," % (highlight_if_color(unicode(k),'1'), unicode(v2))
s+='\n}'
s += "\t '%s': '%s',\n" % (highlight_if_color(unicode(k),'1'), unicode(v2))
s+='}'
return s
def pretty_printed_iu_event(iu, event_type, local):
......@@ -141,9 +140,6 @@ if __name__ == '__main__':
resend_active=True)
buffers[channel].register_handler(my_update_handler)
print('')
print('Ipaaca IU Sniffer - run with --help to see options')
channellist = 's ' if len(arguments.channels) > 1 else ' '
channellist += ', '.join(arguments.channels)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment