diff --git a/.gitignore b/.gitignore index 23a90d4d6976065ee331050dc4c87ea56ea94b42..1e5fb5e9ccc3e5abd10bf2422338d5a10dcae5a6 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ */*/generatedsrc */build */*/build +*/*/*/build */lib */test/lib */test/report @@ -12,7 +13,7 @@ */.project */.classpath *.pyc -**/.*.swp +*/.*.swp .*.sw[a-z] *.un~ Session.vim @@ -20,7 +21,7 @@ Session.vim */*/manifest.mf */*/*/manifest.mf */*/*/*/manifest.mf -**/*.*~ +*/*.*~ deps dist diff --git a/ipaacalib/cpp/examples/CMakeLists.txt b/ipaacalib/cpp/examples/CMakeLists.txt index f1ea481581f75beb25136949ad733034898e5565..ab17baa7b104b4f8bd34521959949ee01f468c63 100644 --- a/ipaacalib/cpp/examples/CMakeLists.txt +++ b/ipaacalib/cpp/examples/CMakeLists.txt @@ -42,8 +42,8 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_OLD_CODE_CONVENIENCE_FLAGS} ${CXX_ include_directories( ${PROJECT_SOURCE_DIR}/include ) # add lib and include directory from pulled dependencies -include_directories( ${PROJECT_SOURCE_DIR}/../../../dist/include ) -link_directories( ${PROJECT_SOURCE_DIR}/../../../dist/lib ) +include_directories( ${PROJECT_SOURCE_DIR}/../../../dist/include ${PROJECT_SOURCE_DIR}/../../../deps/include ) +link_directories( ${PROJECT_SOURCE_DIR}/../../../dist/lib ${PROJECT_SOURCE_DIR}/../../../deps/lib ) # specify source files for ipaaca (auto-generated ones are in build/ ) set (SOURCE diff --git a/ipaacalib/cpp/examples/src/example-component.cc b/ipaacalib/cpp/examples/src/example-component.cc index 36c8c4a08772dd7bec868d7a2d6f0bfa85891a24..23bb04ee9be44db791c209da03bf782a553ed11f 100644 --- a/ipaacalib/cpp/examples/src/example-component.cc +++ b/ipaacalib/cpp/examples/src/example-component.cc @@ -12,7 +12,7 @@ // 'Component' (the concept does not exist anymore in ipaaca2). // -#include <ipaaca.h> +#include <ipaaca/ipaaca.h> #include <typeinfo> using namespace ipaaca; @@ -73,7 +73,12 @@ void LegacyComponent::handle_iu_event(IUInterface::ptr iu, IUEventType event_typ } else { // event on a remote IU - if (event_type == IU_ADDED) { + if (event_type == IU_MESSAGE) { + std::cout << "[Received new Message!]" << std::endl; + + std::string description = iu->payload()["description"]; + std::cout << "[ Current description: " << description << "]" << std::endl; + } else if (event_type == IU_ADDED) { std::cout << "[Received new IU!]" << std::endl; /// new Payload class enables dynamic typing to some degree (numeric default 0) @@ -125,7 +130,7 @@ void LegacyComponent::publish_reply_iu(const std::string& text, const std::strin } void LegacyComponent::publish_hello_world() { - IU::ptr iu = IU::create( "myCategoryInterest"); //helloWorld" ); + IU::ptr iu = Message::create( "myCategoryInterest"); //helloWorld" ); iu->payload()["description"] = "Hello world"; _out_buf->add(iu); } diff --git a/ipaacalib/cpp/include/ipaaca/ipaaca.h b/ipaacalib/cpp/include/ipaaca/ipaaca.h index 5f36ad50011d857fb59920f254d8373620715ccb..16f2fb500a0c5f7c390307816c77cb776f56e3d0 100644 --- a/ipaacalib/cpp/include/ipaaca/ipaaca.h +++ b/ipaacalib/cpp/include/ipaaca/ipaaca.h @@ -9,7 +9,7 @@ /// running release number of ipaaca-c++ #define IPAACA_CPP_RELEASE_NUMBER 1 /// date of last release number increment -#define IPAACA_CPP_RELEASE_DATE "2012-04-13" +#define IPAACA_CPP_RELEASE_DATE "2012-09-08" #ifdef IPAACA_DEBUG_MESSAGES #define IPAACA_INFO(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- " << i << std::endl; @@ -73,8 +73,9 @@ typedef uint32_t IUEventType; #define IU_RETRACTED 8 #define IU_UPDATED 16 #define IU_LINKSUPDATED 32 +#define IU_MESSAGE 64 /// Bit mask for receiving all events -#define IU_ALL_EVENTS 63 +#define IU_ALL_EVENTS 127 /// Convert an int event type to a human-readable string inline std::string iu_event_type_to_str(IUEventType type) @@ -86,6 +87,7 @@ inline std::string iu_event_type_to_str(IUEventType type) case IU_RETRACTED: return "RETRACTED"; case IU_UPDATED: return "UPDATED"; case IU_LINKSUPDATED: return "LINKSUPDATED"; + case IU_MESSAGE: return "MESSAGE"; default: return "(NOT A KNOWN SINGLE IU EVENT TYPE)"; } } @@ -429,7 +431,9 @@ class Payload//{{{ friend std::ostream& operator<<(std::ostream& os, const Payload& obj); friend class IUInterface; friend class IU; + friend class Message; friend class RemotePushIU; + friend class RemoteMessage; friend class IUConverter; friend class CallbackIUPayloadUpdate; protected: @@ -544,13 +548,35 @@ class IU: public IUInterface {//{{{ inline Payload& payload() { return _payload; } inline const Payload& const_payload() const { return _payload; } void commit(); + protected: + virtual void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = ""); + virtual void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name = ""); + protected: + virtual void _internal_commit(const std::string& writer_name = ""); + public: + typedef boost::shared_ptr<IU> ptr; +};//}}} +class Message: public IU {//{{{ + friend class Buffer; + friend class InputBuffer; + friend class OutputBuffer; + friend class CallbackIUPayloadUpdate; + friend class CallbackIULinkUpdate; + friend class CallbackIUCommission; + protected: + Message(const std::string& category, IUAccessMode access_mode=IU_ACCESS_MESSAGE, bool read_only=true, const std::string& payload_type="MAP" ); + public: + inline ~Message() { + IPAACA_IMPLEMENT_ME + } + static boost::shared_ptr<Message> create(const std::string& category, IUAccessMode access_mode=IU_ACCESS_MESSAGE, bool read_only=true, const std::string& payload_type="MAP" ); protected: void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = ""); void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name = ""); protected: void _internal_commit(const std::string& writer_name = ""); public: - typedef boost::shared_ptr<IU> ptr; + typedef boost::shared_ptr<Message> ptr; };//}}} class RemotePushIU: public IUInterface {//{{{ @@ -580,6 +606,33 @@ class RemotePushIU: public IUInterface {//{{{ void _apply_retraction(); typedef boost::shared_ptr<RemotePushIU> ptr; };//}}} +class RemoteMessage: public IUInterface {//{{{ + friend class Buffer; + friend class InputBuffer; + friend class OutputBuffer; + friend class IUConverter; + public: + Payload _payload; + protected: + RemoteMessage(); + static boost::shared_ptr<RemoteMessage> create(); + public: + inline ~RemoteMessage() { + IPAACA_IMPLEMENT_ME + } + inline Payload& payload() { return _payload; } + inline const Payload& const_payload() const { return _payload; } + void commit(); + protected: + void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = ""); + void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name = ""); + protected: + void _apply_update(IUPayloadUpdate::ptr update); + void _apply_link_update(IULinkUpdate::ptr update); + void _apply_commission(); + void _apply_retraction(); + typedef boost::shared_ptr<RemoteMessage> ptr; +};//}}} class Exception: public std::exception//{{{ { diff --git a/ipaacalib/cpp/src/ipaaca.cc b/ipaacalib/cpp/src/ipaaca.cc index 26513538f1fbe40b9e1fbdde8d2690dbe8c0d149..75166fda52655dea1e2e70c322cff589eddcf858 100644 --- a/ipaacalib/cpp/src/ipaaca.cc +++ b/ipaacalib/cpp/src/ipaaca.cc @@ -434,7 +434,13 @@ void OutputBuffer::add(IU::ptr iu) if (_iu_store.count(iu->uid()) > 0) { throw IUPublishedError(); } - _iu_store[iu->uid()] = iu; + if (iu->is_published()) { + throw IUPublishedError(); + } + if (iu->access_mode() != IU_ACCESS_MESSAGE) { + // (for Message-type IUs: do not actually store them) + _iu_store[iu->uid()] = iu; + } iu->_associate_with_buffer(this); //shared_from_this()); _publish_iu(iu); } @@ -620,6 +626,12 @@ void InputBuffer::_handle_iu_events(EventPtr event) call_iu_event_handlers(iu, false, IU_ADDED, iu->category() ); } //IPAACA_INFO( "New RemotePushIU state: " << (*iu) ) + } else if (type == "ipaaca::RemoteMessage") { + boost::shared_ptr<RemoteMessage> iu = boost::static_pointer_cast<RemoteMessage>(event->getData()); + //_iu_store[iu->uid()] = iu; + //iu->_set_buffer(this); + call_iu_event_handlers(iu, false, IU_MESSAGE, iu->category() ); + //_iu_store.erase(iu->uid()); } else { RemotePushIUStore::iterator it; if (type == "ipaaca::IUPayloadUpdate") { @@ -851,6 +863,40 @@ void IU::_internal_commit(const std::string& writer_name) _revision_lock.unlock(); } //}}} +// Message//{{{ +Message::ptr Message::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type) +{ + Message::ptr iu = Message::ptr(new Message(category, access_mode, read_only, payload_type)); /* params */ //)); + iu->_payload.initialize(iu); + return iu; +} + +Message::Message(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type) +: IU(category, access_mode, read_only, payload_type) +{ +} + +void Message::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) +{ + if (is_published()) { + IPAACA_INFO("Info: modifying a Message after sending has no global effects") + } +} +void Message::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) +{ + if (is_published()) { + IPAACA_INFO("Info: modifying a Message after sending has no global effects") + } +} + +void Message::_internal_commit(const std::string& writer_name) +{ + if (is_published()) { + IPAACA_INFO("Info: committing to a Message after sending has no global effects") + } + +} +//}}} // RemotePushIU//{{{ @@ -967,21 +1013,73 @@ void RemotePushIU::_apply_retraction() { _retracted = true; } -void Payload::_remotely_enforced_wipe() +//}}} + +// RemoteMessage//{{{ + +RemoteMessage::ptr RemoteMessage::create() { - _store.clear(); + RemoteMessage::ptr iu = RemoteMessage::ptr(new RemoteMessage(/* params */)); + iu->_payload.initialize(iu); + return iu; } -void Payload::_remotely_enforced_delitem(const std::string& k) +RemoteMessage::RemoteMessage() { - _store.erase(k); + // nothing } -void Payload::_remotely_enforced_setitem(const std::string& k, const std::string& v) +void RemoteMessage::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) { - _store[k] = v; + IPAACA_INFO("Info: modifying a RemoteMessage only has local effects") +} +void RemoteMessage::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) +{ + IPAACA_INFO("Info: modifying a RemoteMessage only has local effects") +} +void RemoteMessage::commit() +{ + IPAACA_INFO("Info: committing to a RemoteMessage only has local effects") } -//}}} +void RemoteMessage::_apply_link_update(IULinkUpdate::ptr update) +{ + IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_link_update") + _revision = update->revision; + if (update->is_delta) { + _add_and_remove_links(update->new_links, update->links_to_remove); + } else { + _replace_links(update->new_links); + } +} +void RemoteMessage::_apply_update(IUPayloadUpdate::ptr update) +{ + IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_update") + _revision = update->revision; + if (update->is_delta) { + for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) { + _payload._remotely_enforced_delitem(*it); + } + for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) { + _payload._remotely_enforced_setitem(it->first, it->second); + } + } else { + _payload._remotely_enforced_wipe(); + for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) { + _payload._remotely_enforced_setitem(it->first, it->second); + } + } +} +void RemoteMessage::_apply_commission() +{ + IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_commission") + _committed = true; +} +void RemoteMessage::_apply_retraction() +{ + IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_retraction") + _retracted = true; +} +//}}} @@ -993,25 +1091,25 @@ PayloadEntryProxy::PayloadEntryProxy(Payload* payload, const std::string& key) } PayloadEntryProxy& PayloadEntryProxy::operator=(const std::string& value) { - std::cout << "operator=(string)" << std::endl; + //std::cout << "operator=(string)" << std::endl; _payload->set(_key, value); return *this; } PayloadEntryProxy& PayloadEntryProxy::operator=(const char* value) { - std::cout << "operator=(const char*)" << std::endl; + //std::cout << "operator=(const char*)" << std::endl; _payload->set(_key, value); return *this; } PayloadEntryProxy& PayloadEntryProxy::operator=(double value) { - std::cout << "operator=(double)" << std::endl; + //std::cout << "operator=(double)" << std::endl; _payload->set(_key, boost::lexical_cast<std::string>(value)); return *this; } PayloadEntryProxy& PayloadEntryProxy::operator=(bool value) { - std::cout << "operator=(bool)" << std::endl; + //std::cout << "operator=(bool)" << std::endl; _payload->set(_key, boost::lexical_cast<std::string>(value)); return *this; } @@ -1026,11 +1124,13 @@ PayloadEntryProxy::operator bool() } PayloadEntryProxy::operator long() { - return boost::lexical_cast<long>(operator std::string().c_str()); + //return boost::lexical_cast<long>(operator std::string().c_str()); + return atof(operator std::string().c_str()); } PayloadEntryProxy::operator double() { - return boost::lexical_cast<double>(operator std::string().c_str()); + //return boost::lexical_cast<double>(operator std::string().c_str()); + return atol(operator std::string().c_str()); } //}}} @@ -1075,6 +1175,19 @@ inline std::string Payload::get(const std::string& k) { if (_store.count(k)>0) return _store[k]; else return IPAACA_PAYLOAD_DEFAULT_STRING_VALUE; } +void Payload::_remotely_enforced_wipe() +{ + _store.clear(); +} +void Payload::_remotely_enforced_delitem(const std::string& k) +{ + _store.erase(k); +} +void Payload::_remotely_enforced_setitem(const std::string& k, const std::string& v) +{ + _store[k] = v; +} + //}}} // IUConverter//{{{ @@ -1098,7 +1211,19 @@ std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire) pbo->set_payload_type(obj->payload_type()); pbo->set_owner_name(obj->owner_name()); pbo->set_committed(obj->committed()); - pbo->set_access_mode(ipaaca::protobuf::IU::PUSH); // TODO + ipaaca::protobuf::IU_AccessMode a_m; + switch(obj->access_mode()) { + case IU_ACCESS_PUSH: + a_m = ipaaca::protobuf::IU_AccessMode_PUSH; + break; + case IU_ACCESS_REMOTE: + a_m = ipaaca::protobuf::IU_AccessMode_REMOTE; + break; + case IU_ACCESS_MESSAGE: + a_m = ipaaca::protobuf::IU_AccessMode_MESSAGE; + break; + } + pbo->set_access_mode(a_m); pbo->set_read_only(obj->read_only()); for (std::map<std::string, std::string>::const_iterator it=obj->_payload._store.begin(); it!=obj->_payload._store.end(); ++it) { protobuf::PayloadItem* item = pbo->add_payload(); @@ -1152,6 +1277,34 @@ AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std: return std::make_pair("ipaaca::RemotePushIU", obj); break; } + case IU_ACCESS_MESSAGE: + { + // Create a "Message-type IU" + boost::shared_ptr<RemoteMessage> obj = RemoteMessage::create(); + // transfer pbo data to obj + obj->_uid = pbo->uid(); + obj->_revision = pbo->revision(); + obj->_category = pbo->category(); + obj->_payload_type = pbo->payload_type(); + obj->_owner_name = pbo->owner_name(); + obj->_committed = pbo->committed(); + obj->_read_only = pbo->read_only(); + obj->_access_mode = IU_ACCESS_MESSAGE; + for (int i=0; i<pbo->payload_size(); i++) { + const protobuf::PayloadItem& it = pbo->payload(i); + obj->_payload._store[it.key()] = it.value(); + } + for (int i=0; i<pbo->links_size(); i++) { + const protobuf::LinkSet& pls = pbo->links(i); + LinkSet& ls = obj->_links._links[pls.type()]; + for (int j=0; j<pls.targets_size(); j++) { + ls.insert(pls.targets(j)); + } + } + //return std::make_pair(getDataType(), obj); + return std::make_pair("ipaaca::RemoteMessage", obj); + break; + } default: // other cases not handled yet! ( TODO ) throw NotImplementedError(); diff --git a/ipaacalib/cpp/test/src/Makefile b/ipaacalib/cpp/test/src/Makefile index e255f9ddfb51feaeddfeedf1495b340a13e5c5a8..11a2336bdded24ce11d4832b37928f1b9ec3beb3 100644 --- a/ipaacalib/cpp/test/src/Makefile +++ b/ipaacalib/cpp/test/src/Makefile @@ -2,11 +2,11 @@ CONFIG = -DIPAACA_DEBUG_MESSAGES #IPAACASOURCES = ../../src/ipaaca.cc ipaaca.pb.cc #TEXTSOURCES = ${IPAACASOURCES} testipaaca.cc TEXTSOURCES = testipaaca.cc -CCFLAGS=-I../../../../dist/include -I. -I../../src -I/usr/local/include -I/opt/local/include ${CONFIG} -BOOSTLIBS = -L/opt/local/lib -lboost_regex-mt -lboost_date_time-mt -lboost_program_options-mt -lboost_thread-mt -lboost_filesystem-mt -lboost_signals-mt -lboost_system-mt +CCFLAGS=-I../../../../deps/include -I../../../../dist/include -I. -I../../src -I/usr/local/include -I/opt/local/include ${CONFIG} +BOOSTLIBS = -L/opt/local/lib -lboost_regex-mt -lboost_date_time-mt -lboost_thread-mt PROTOLIBS = -L/opt/local/lib -lprotobuf #LIBS = ${BOOSTLIBS} ${PROTOLIBS} -L/usr/local/lib -lrsc -lrsbcore -LIBS = -L../../../../dist/lib -lipaaca +LIBS = -L../../../../deps/lib -L../../../../dist/lib -lipaaca COMPILER = gfilt diff --git a/ipaacalib/cpp/test/src/testipaaca.cc b/ipaacalib/cpp/test/src/testipaaca.cc index 4ce27cf811122ce6984e73a72547d944df38f37b..84d6f04a90aafef655bacb77130cca947c25f017 100644 --- a/ipaacalib/cpp/test/src/testipaaca.cc +++ b/ipaacalib/cpp/test/src/testipaaca.cc @@ -1,4 +1,4 @@ -#include <ipaaca.h> +#include <ipaaca/ipaaca.h> #include <typeinfo> using namespace ipaaca; diff --git a/ipaacalib/python/src/ipaaca.py b/ipaacalib/python/src/ipaaca.py index 40289b3c1b3de5ae9a879cf8a062257bb8a093d3..e097cb72588092a2094edfd714adb8a61b9b45b6 100755 --- a/ipaacalib/python/src/ipaaca.py +++ b/ipaacalib/python/src/ipaaca.py @@ -85,7 +85,8 @@ IUEventType = enum( DELETED = 'DELETED', RETRACTED = 'RETRACTED', UPDATED = 'UPDATED', - LINKSUPDATED = 'LINKSUPDATED' + LINKSUPDATED = 'LINKSUPDATED', + MESSAGE = 'MESSAGE' ) @@ -416,6 +417,143 @@ class IU(IUInterface):#{{{ #}}} +class Message(IU):#{{{ + """Local IU of Message sub-type. Can be handled like a normal IU, but on the remote side it is only existent during the handler calls.""" + def __init__(self, category='undef', access_mode=IUAccessMode.MESSAGE, read_only=True, _payload_type='MAP'): + super(Message, self).__init__(category=category, access_mode=access_mode, read_only=read_only, _payload_type=_payload_type) + + def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None): + if self.is_published: + logger.info('Info: modifying a Message after sending has no global effects') + + def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): + if self.is_published: + logger.info('Info: modifying a Message after sending has no global effects') + + def _increase_revision_number(self): + self._revision += 1 + + def _internal_commit(self, writer_name=None): + if self.is_published: + logger.info('Info: committing to a Message after sending has no global effects') + + def commit(self): + return self._internal_commit() + + def _get_payload(self): + return self._payload + def _set_payload(self, new_pl, writer_name=None): + if self.is_published: + logger.info('Info: modifying a Message after sending has no global effects') + else: + if self.committed: + raise IUCommittedError(self) + with self.revision_lock: + self._increase_revision_number() + self._payload = Payload( + iu=self, + writer_name=None if self.buffer is None else (self.buffer.unique_name if writer_name is None else writer_name), + new_payload=new_pl) + payload = property( + fget=_get_payload, + fset=_set_payload, + doc='Payload dictionary of this IU.') + + def _get_is_published(self): + return self.buffer is not None + is_published = property( + fget=_get_is_published, + doc='Flag indicating whether this IU has been published or not.') + + def _set_buffer(self, buffer): + if self._buffer is not None: + raise Exception('The IU is already in a buffer, cannot move it.') + self._buffer = buffer + self.owner_name = buffer.unique_name + self._payload.owner_name = buffer.unique_name + buffer = property( + fget=IUInterface._get_buffer, + fset=_set_buffer, + doc='Buffer this IU is held in.') + + def _set_uid(self, uid): + if self._uid is not None: + raise AttributeError('The uid of IU ' + self.uid + ' has already been set, cannot change it.') + self._uid = uid + uid = property( + fget=IUInterface._get_uid, + fset=_set_uid, + doc='Unique ID of the IU.') +#}}} + +class RemoteMessage(IUInterface):#{{{ + + """A remote IU with access mode 'MESSAGE'.""" + + def __init__(self, uid, revision, read_only, owner_name, category, payload_type, committed, payload, links): + super(RemoteMessage, self).__init__(uid=uid, access_mode=IUAccessMode.PUSH, read_only=read_only) + self._revision = revision + self._category = category + self.owner_name = owner_name + self._payload_type = payload_type + self._committed = committed + self._retracted = False + # NOTE Since the payload is an already-existant Payload which we didn't modify ourselves, + # don't try to invoke any modification checks or network updates ourselves either. + # We are just receiving it here and applying the new data. + self._payload = Payload(iu=self, new_payload=payload, omit_init_update_message=True) + self._links = links + + def _modify_links(self, is_delta=False, new_links={}, links_to_remove={}, writer_name=None): + logger.info('Info: modifying a RemoteMessage only has local effects') + + def _modify_payload(self, is_delta=True, new_items={}, keys_to_remove=[], writer_name=None): + logger.info('Info: modifying a RemoteMessage only has local effects') + + def commit(self): + logger.info('Info: committing to a RemoteMessage only has local effects') + + def _get_payload(self): + return self._payload + def _set_payload(self, new_pl): + logger.info('Info: modifying a RemoteMessage only has local effects') + self._payload = Payload(iu=self, new_payload=new_pl, omit_init_update_message=True) + payload = property( + fget=_get_payload, + fset=_set_payload, + doc='Payload dictionary of the IU.') + + def _apply_link_update(self, update): + """Apply a IULinkUpdate to the IU.""" + logger.warning('Warning: should never be called: RemoteMessage._apply_link_update') + self._revision = update.revision + if update.is_delta: + self._add_and_remove_links(add=update.new_links, remove=update.links_to_remove) + else: + self._replace_links(links=update.new_links) + + def _apply_update(self, update): + """Apply a IUPayloadUpdate to the IU.""" + logger.warning('Warning: should never be called: RemoteMessage._apply_update') + self._revision = update.revision + if update.is_delta: + for k in update.keys_to_remove: self.payload._remotely_enforced_delitem(k) + for k, v in update.new_items.items(): self.payload._remotely_enforced_setitem(k, v) + else: + # NOTE Please read the comment in the constructor + self._payload = Payload(iu=self, new_payload=update.new_items, omit_init_update_message=True) + + def _apply_commission(self): + """Apply commission to the IU""" + logger.warning('Warning: should never be called: RemoteMessage._apply_commission') + self._committed = True + + def _apply_retraction(self): + """Apply retraction to the IU""" + logger.warning('Warning: should never be called: RemoteMessage._apply_retraction') + self._retracted = True +#}}} + class RemotePushIU(IUInterface):#{{{ """A remote IU with access mode 'PUSH'.""" @@ -582,7 +720,7 @@ class IUConverter(rsb.converter.Converter):#{{{ pbo.payload_type = iu._payload_type pbo.owner_name = iu._owner_name pbo.committed = iu._committed - pbo.access_mode = ipaaca_pb2.IU.PUSH # TODO + pbo.access_mode = iu._access_mode #ipaaca_pb2.IU.PUSH # TODO pbo.read_only = iu._read_only for k,v in iu._payload.items(): entry = pbo.payload.add() @@ -619,8 +757,29 @@ class IUConverter(rsb.converter.Converter):#{{{ links=_links ) return remote_push_iu + elif pbo.access_mode == ipaaca_pb2.IU.MESSAGE: + _payload = {} + for entry in pbo.payload: + k, v = unpack_typed_payload_item(entry) + _payload[k] = v + _links = collections.defaultdict(set) + for linkset in pbo.links: + for target_uid in linkset.targets: + _links[linkset.type].add(target_uid) + remote_message = RemoteMessage( + uid=pbo.uid, + revision=pbo.revision, + read_only = pbo.read_only, + owner_name = pbo.owner_name, + category = pbo.category, + payload_type = pbo.payload_type, + committed = pbo.committed, + payload=_payload, + links=_links + ) + return remote_message else: - raise Exception("We can only handle IUs with access mode 'PUSH' for now!") + raise Exception("We can only handle IUs with access mode 'PUSH' or 'MESSAGE' for now!") else: raise ValueError("Inacceptable dataType %s" % type) #}}} @@ -915,6 +1074,12 @@ class InputBuffer(Buffer): self._iu_store[ event.data.uid ] = event.data event.data.buffer = self self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.ADDED, category=event.data.category) + elif type_ is RemoteMessage: + # a new Message, an ephemeral IU that is removed after calling handlers + self._iu_store[ event.data.uid ] = event.data + event.data.buffer = self + self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.MESSAGE, category=event.data.category) + del self._iu_store[ event.data.uid ] else: # an update to an existing IU if event.data.uid not in self._iu_store: @@ -1080,7 +1245,11 @@ class OutputBuffer(Buffer): #iu.uid = self._generate_iu_uid() if iu.uid in self._iu_store: raise IUPublishedError(iu) - self._iu_store[iu.uid] = iu + if iu.buffer is not None: + raise IUPublishedError(iu) + if iu.access_mode != IUAccessMode.MESSAGE: + # Messages are not really stored in the OutputBuffer + self._iu_store[iu.uid] = iu iu.buffer = self self._publish_iu(iu)