diff --git a/cpp/src/Makefile b/cpp/src/Makefile index ea5197601c9a79e58196e5be0a68487384ab4c66..f553bd1a17c634c6f5896a560923e4b416be2394 100644 --- a/cpp/src/Makefile +++ b/cpp/src/Makefile @@ -8,7 +8,7 @@ LIBS = ${BOOSTLIBS} ${PROTOLIBS} -L/usr/local/lib -lrsc -lrsbcore COMPILER = gfilt all: main - true + receiver: ${COMPILER} ${CCFLAGS} -DMAKE_RECEIVER -o ipaaca-receiver ${SOURCES} ${LIBS} diff --git a/cpp/src/ipaaca-test-main.cc b/cpp/src/ipaaca-test-main.cc index dc2dede393b602158f6b53d11036db4d2ed2723a..5664e5efbb1567a56343ecb6c1c8b377f75ad418 100644 --- a/cpp/src/ipaaca-test-main.cc +++ b/cpp/src/ipaaca-test-main.cc @@ -91,7 +91,8 @@ int main() { initialize_ipaaca_rsb(); - OutputBuffer ob; + OutputBuffer ob("TestOB"); + std::cout << "Buffer: " << ob.unique_name() << std::endl; IU::ref iu = IU::create(); ob.add(iu); diff --git a/cpp/src/ipaaca.cc b/cpp/src/ipaaca.cc index f4525af5dfca72bf88da7d3927f9baf0dff4874f..9bb0cc6be7bc1d424bb64d4814dd6c2935c06407 100644 --- a/cpp/src/ipaaca.cc +++ b/cpp/src/ipaaca.cc @@ -124,9 +124,20 @@ void SmartLinkMap::_replace_links(const LinkMap& links) } //}}} - +// Buffer//{{{ +void Buffer::_allocate_unique_name(const std::string& basename) { + std::string uuid = ipaaca::generate_uuid_string(); + std::string name = basename + "-" + uuid.substr(0,8); + _unique_name = name; +} +//}}} // OutputBuffer//{{{ + +OutputBuffer::OutputBuffer(const std::string& basename) +:Buffer(basename) +{ +} void OutputBuffer::_send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) { IPAACA_IMPLEMENT_ME @@ -143,7 +154,7 @@ void OutputBuffer::add(IU::ref iu) { IPAACA_IMPLEMENT_ME // TODO place in iu store - iu->_set_buffer(this); //shared_from_this()); + iu->_associate_with_buffer(this); //shared_from_this()); // TODO } @@ -204,6 +215,12 @@ void OutputBuffer::add(IU::ref iu) */ //}}} +// InputBuffer//{{{ +InputBuffer::InputBuffer(const std::string& basename) +:Buffer(basename) +{ +} +//}}} @@ -226,6 +243,7 @@ void IUInterface::_set_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buff throw IUAlreadyInABufferError(); } _buffer = buffer; + } void IUInterface::_set_owner_name(const std::string& owner_name) { @@ -235,6 +253,13 @@ void IUInterface::_set_owner_name(const std::string& owner_name) { _owner_name = owner_name; } +/// set the buffer pointer and the owner names of IU and Payload +void IUInterface::_associate_with_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) { + _set_buffer(buffer); // will throw if already set + _set_owner_name(buffer->unique_name()); + payload()._set_owner_name(buffer->unique_name()); +} + void IUInterface::add_links(const std::string& type, const LinkSet& targets, const std::string& writer_name) { LinkMap none; @@ -271,7 +296,7 @@ void IUInterface::set_links(const LinkMap& links, const std::string& writer_name // IU//{{{ IU::ref IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type) { - IU::ref iu = IU::ref(new IU(/* params */)); + IU::ref iu = IU::ref(new IU(category, access_mode, read_only, payload_type)); /* params */ //)); iu->_payload.initialize(iu); return iu; } @@ -337,6 +362,17 @@ void IU::_internal_commit(const std::string& writer_name) //}}} // RemotePushIU//{{{ + +RemotePushIU::ref RemotePushIU::create() +{ + RemotePushIU::ref iu = RemotePushIU::ref(new RemotePushIU(/* params */)); + iu->_payload.initialize(iu); + return iu; +} +RemotePushIU::RemotePushIU() +{ + // nothing +} void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) { IPAACA_IMPLEMENT_ME @@ -414,7 +450,6 @@ inline std::string Payload::get(const std::string& k) { } //}}} -/* // IUConverter//{{{ IUConverter::IUConverter() @@ -430,17 +465,26 @@ std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire) boost::shared_ptr<const IU> obj = boost::static_pointer_cast<const IU> (data.second); boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU()); // transfer obj data to pbo - pbo->set_uid(obj->uid); - pbo->set_revision(obj->revision); - pbo->set_writer_name(obj->writer_name); - pbo->set_is_delta(obj->is_delta); - for (std::map<std::string, std::string>::const_iterator it=obj->new_items.begin(); it!=obj->new_items.end(); ++it) { - protobuf::PayloadItem* item = pbo->add_new_items(); + pbo->set_uid(obj->uid()); + pbo->set_revision(obj->revision()); + pbo->set_category(obj->category()); + pbo->set_payload_type(obj->payload_type()); + pbo->set_owner_name(obj->owner_name()); + pbo->set_committed(obj->committed()); + pbo->set_access_mode(ipaaca::protobuf::IU::PUSH); // TODO + pbo->set_read_only(obj->read_only()); + for (std::map<std::string, std::string>::const_iterator it=obj->_payload._store.begin(); it!=obj->_payload._store.end(); ++it) { + protobuf::PayloadItem* item = pbo->add_payload(); item->set_key(it->first); item->set_value(it->second); + item->set_type("str"); // FIXME other types than str (later) } - for (std::vector<std::string>::const_iterator it=obj->keys_to_remove.begin(); it!=obj->keys_to_remove.end(); ++it) { - pbo->add_keys_to_remove(*it); + for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) { + protobuf::LinkSet* links = pbo->add_links(); + links->set_type(it->first); + for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) { + links->add_targets(*it2); + } } pbo->SerializeToString(&wire); return getWireSchema(); @@ -448,27 +492,45 @@ std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire) } AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std::string& wire) { - assert(wireSchema == getWireSchema()); // "ipaaca-iu-payload-update" + assert(wireSchema == getWireSchema()); // "ipaaca-iu" boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU()); pbo->ParseFromString(wire); - boost::shared_ptr<IU> obj(new IU()); - // transfer pbo data to obj - obj->uid = pbo->uid(); - obj->revision = pbo->revision(); - obj->writer_name = pbo->writer_name(); - obj->is_delta = pbo->is_delta(); - for (int i=0; i<pbo->new_items_size(); i++) { - const protobuf::PayloadItem& it = pbo->new_items(i); - obj->new_items[it.key()] = it.value(); - } - for (int i=0; i<pbo->keys_to_remove_size(); i++) { - obj->keys_to_remove.push_back(pbo->keys_to_remove(i)); + IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode()); + switch(mode) { + case IU_ACCESS_PUSH: + { + // Create a "remote push IU" + boost::shared_ptr<RemotePushIU> obj(new RemotePushIU()); + // transfer pbo data to obj + obj->_uid = pbo->uid(); + obj->_revision = pbo->revision(); + obj->_category = pbo->category(); + obj->_payload_type = pbo->payload_type(); + obj->_owner_name = pbo->owner_name(); + obj->_committed = pbo->committed(); + obj->_read_only = pbo->read_only(); + obj->_access_mode = IU_ACCESS_PUSH; + for (int i=0; i<pbo->payload_size(); i++) { + const protobuf::PayloadItem& it = pbo->payload(i); + obj->_payload._store[it.key()] = it.value(); + } + for (int i=0; i<pbo->links_size(); i++) { + const protobuf::LinkSet& pls = pbo->links(i); + LinkSet& ls = obj->_links._links[pls.type()]; + for (int j=0; j<pls.targets_size(); j++) { + ls.insert(pls.targets(j)); + } + } + return std::make_pair(getDataType(), obj); + break; + } + default: + // other cases not handled yet! ( TODO ) + throw NotImplementedError(); } - return std::make_pair(getDataType(), obj); } //}}} -*/ // IUPayloadUpdateConverter//{{{ diff --git a/cpp/src/ipaaca.h b/cpp/src/ipaaca.h index ec55337f51c8c96d57c2b3f3577025717ee7563e..71ae107f42ec18ee44f7de39370158b342744bda 100644 --- a/cpp/src/ipaaca.h +++ b/cpp/src/ipaaca.h @@ -45,18 +45,18 @@ namespace ipaaca { typedef uint32_t revision_t; enum IUEventType { - ADDED, - COMMITTED, - DELETED, - RETRACTED, - UPDATED, - LINKSUPDATED + IU_ADDED, + IU_COMMITTED, + IU_DELETED, + IU_RETRACTED, + IU_UPDATED, + IU_LINKSUPDATED }; enum IUAccessMode { - PUSH, - REMOTE, - MESSAGE + IU_ACCESS_PUSH, + IU_ACCESS_REMOTE, + IU_ACCESS_MESSAGE }; //class { @@ -109,11 +109,13 @@ class Lock typedef std::set<std::string> LinkSet; typedef std::map<std::string, LinkSet> LinkMap; class SmartLinkMap { + friend class IUInterface; + friend class IU; + friend class IUConverter; public: const LinkSet& get_links(const std::string& key); const LinkMap& get_all_links(); - friend class IUInterface; protected: LinkMap _links; void _add_and_remove_links(const LinkMap& add, const LinkMap& remove); @@ -126,14 +128,37 @@ const LinkSet EMPTY_LINK_SET; class Buffer { //: public boost::enable_shared_from_this<Buffer> { friend class IU; friend class RemotePushIU; + protected: + std::string _unique_name; protected: _IPAACA_ABSTRACT_ virtual void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef") = 0; _IPAACA_ABSTRACT_ virtual void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef") = 0; _IPAACA_ABSTRACT_ virtual void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name="undef") = 0; + void _allocate_unique_name(const std::string& basename); + inline Buffer(const std::string& basename) { + _allocate_unique_name(basename); + } public: + virtual inline ~Buffer() { } + inline const std::string& unique_name() { return _unique_name; } _IPAACA_ABSTRACT_ virtual void add(boost::shared_ptr<IU> iu) = 0; }; +class OutputBuffer: public Buffer { //, public boost::enable_shared_from_this<OutputBuffer> { + friend class IU; + friend class RemotePushIU; + protected: + void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef"); + void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef"); + void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name); + public: + OutputBuffer(const std::string& basename); + ~OutputBuffer() { + IPAACA_IMPLEMENT_ME + } + void add(boost::shared_ptr<IU> iu); +}; + class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<InputBuffer> { friend class IU; friend class RemotePushIU; @@ -151,23 +176,16 @@ class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<Inp IPAACA_INFO("(ERROR) InputBuffer::_send_iu_commission() should never be invoked") } public: + InputBuffer(const std::string& basename); + ~InputBuffer() { + IPAACA_IMPLEMENT_ME + } inline void add(boost::shared_ptr<IU> iu) { IPAACA_IMPLEMENT_ME } }; -class OutputBuffer: public Buffer { //, public boost::enable_shared_from_this<OutputBuffer> { - friend class IU; - friend class RemotePushIU; - protected: - void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef"); - void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef"); - void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name); - public: - void add(boost::shared_ptr<IU> iu); -}; - /* class IUEventFunctionHandler: public rsb::EventFunctionHandler { protected: @@ -178,6 +196,13 @@ class IUEventFunctionHandler: public rsb::EventFunctionHandler { }; */ +class IUConverter: public rsb::converter::Converter<std::string> {//{{{ + public: + IUConverter(); + std::string serialize(const rsb::AnnotatedData& data, std::string& wire); + rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire); +};//}}} + class IUPayloadUpdate {//{{{ public: std::string uid; @@ -232,14 +257,20 @@ class PayloadEntryProxy//{{{ class Payload//{{{ { + friend class IUInterface; + friend class IU; + friend class RemotePushIU; + friend class IUConverter; protected: + std::string _owner_name; std::map<std::string, std::string> _store; boost::shared_ptr<IUInterface> _iu; protected: - friend class IU; - friend class RemotePushIU; void initialize(boost::shared_ptr<IUInterface> iu); + inline void _set_owner_name(const std::string& name) { _owner_name = name; } public: + inline const std::string& owner_name() { return _owner_name; } + // access PayloadEntryProxy operator[](const std::string& key); void set(const std::string& k, const std::string& v); void remove(const std::string& k); @@ -248,6 +279,7 @@ class Payload//{{{ };//}}} class IUInterface {//{{{ + friend class IUConverter; protected: IUInterface(); public: @@ -271,6 +303,7 @@ class IUInterface {//{{{ _IPAACA_ABSTRACT_ virtual void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) = 0; _IPAACA_ABSTRACT_ virtual void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) = 0; //void _set_buffer(boost::shared_ptr<Buffer> buffer); + void _associate_with_buffer(Buffer* buffer); void _set_buffer(Buffer* buffer); void _set_uid(const std::string& uid); void _set_owner_name(const std::string& owner_name); @@ -280,16 +313,16 @@ class IUInterface {//{{{ inline void _replace_links(const LinkMap& links) { _links._replace_links(links); } public: inline bool is_published() { return (_buffer != 0); } - inline const std::string& uid() { return _uid; } - inline revision_t revision() { return _revision; } - inline const std::string& category() { return _category; } - inline const std::string& payload_type() { return _payload_type; } - inline const std::string& owner_name() { return _owner_name; } - inline bool committed() { return _committed; } - inline IUAccessMode access_mode() { return _access_mode; } - inline bool read_only() { return _read_only; } + inline const std::string& uid() const { return _uid; } + inline revision_t revision() const { return _revision; } + inline const std::string& category() const { return _category; } + inline const std::string& payload_type() const { return _payload_type; } + inline const std::string& owner_name() const { return _owner_name; } + inline bool committed() const { return _committed; } + inline IUAccessMode access_mode() const { return _access_mode; } + inline bool read_only() const { return _read_only; } //inline boost::shared_ptr<Buffer> buffer() { return _buffer; } - inline Buffer* buffer() { return _buffer; } + inline Buffer* buffer() const { return _buffer; } inline const LinkSet& get_links(std::string type) { return _links.get_links(type); } inline const LinkMap& get_all_links() { return _links.get_all_links(); } // Payload @@ -316,12 +349,12 @@ class IU: public IUInterface {//{{{ Lock _revision_lock; protected: inline void _increase_revision_number() { _revision++; } - IU(const std::string& category="undef", IUAccessMode access_mode=PUSH, bool read_only=false, const std::string& payload_type="MAP" ); + IU(const std::string& category="undef", IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" ); public: inline ~IU() { IPAACA_IMPLEMENT_ME } - static boost::shared_ptr<IU> create(const std::string& category="undef", IUAccessMode access_mode=PUSH, bool read_only=false, const std::string& payload_type="MAP" ); + static boost::shared_ptr<IU> create(const std::string& category="undef", IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" ); inline Payload& payload() { return _payload; } void commit(); protected: @@ -337,16 +370,22 @@ class RemotePushIU: public IUInterface {//{{{ friend class Buffer; friend class InputBuffer; friend class OutputBuffer; + friend class IUConverter; + public: + Payload _payload; protected: - //RemotePushIU(); + RemotePushIU(); + static boost::shared_ptr<RemotePushIU> create(); public: inline ~RemotePushIU() { IPAACA_IMPLEMENT_ME } + inline Payload& payload() { return _payload; } void commit(); protected: void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = ""); void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name = ""); + typedef boost::shared_ptr<RemotePushIU> ref; };//}}} class Exception: public std::exception//{{{ @@ -400,6 +439,14 @@ class IUAlreadyHasAnOwnerNameError: public Exception//{{{ _description = "IUAlreadyHasAnOwnerNameError"; } };//}}} +class NotImplementedError: public Exception//{{{ +{ + public: + inline ~NotImplementedError() throw() { } + inline NotImplementedError() { //boost::shared_ptr<IU> iu) { + _description = "NotImplementedError"; + } +};//}}} } // of namespace ipaaca