diff --git a/cpp/src/ipaaca-test-main.cc b/cpp/src/ipaaca-test-main.cc index 5664e5efbb1567a56343ecb6c1c8b377f75ad418..c10927926f1828b409cad7b072ba828946edd6b5 100644 --- a/cpp/src/ipaaca-test-main.cc +++ b/cpp/src/ipaaca-test-main.cc @@ -94,7 +94,7 @@ int main() { OutputBuffer ob("TestOB"); std::cout << "Buffer: " << ob.unique_name() << std::endl; - IU::ref iu = IU::create(); + IU::ref iu = IU::create("testcategory"); ob.add(iu); std::cout << "_payload.get(\"TEST\") = \"" << iu->_payload.get("TEST") << "\"" << std::endl; diff --git a/cpp/src/ipaaca.cc b/cpp/src/ipaaca.cc index 9bb0cc6be7bc1d424bb64d4814dd6c2935c06407..b22a46944721a5f4ace57836f30971f9b7dc5aef 100644 --- a/cpp/src/ipaaca.cc +++ b/cpp/src/ipaaca.cc @@ -21,8 +21,10 @@ void initialize_ipaaca_rsb() ParticipantConfig config = ParticipantConfig::fromConfiguration(); Factory::getInstance().setDefaultParticipantConfig(config); + boost::shared_ptr<IUConverter> iu_converter(new IUConverter()); boost::shared_ptr<IUPayloadUpdateConverter> payload_update_converter(new IUPayloadUpdateConverter()); boost::shared_ptr<IULinkUpdateConverter> link_update_converter(new IULinkUpdateConverter()); + stringConverterRepository()->registerConverter(iu_converter); stringConverterRepository()->registerConverter(payload_update_converter); stringConverterRepository()->registerConverter(link_update_converter); @@ -152,10 +154,41 @@ void OutputBuffer::_send_iu_commission(IUInterface* iu, revision_t revision, con } void OutputBuffer::add(IU::ref iu) { - IPAACA_IMPLEMENT_ME - // TODO place in iu store + //IPAACA_IMPLEMENT_ME + if (_iu_store.count(iu->uid()) > 0) { + throw IUPublishedError(); + } + _iu_store[iu->uid()] = iu; iu->_associate_with_buffer(this); //shared_from_this()); - // TODO + _publish_iu(iu); +} + +void OutputBuffer::_publish_iu(IU::ref iu) +{ + Informer<AnyType>::Ptr informer = _get_informer(iu->_category); + Informer<ipaaca::IU>::DataPtr iu_data(iu); + informer->publish(iu_data); +} + +Informer<AnyType>::Ptr OutputBuffer::_get_informer(const std::string& category) +{ + if (_informer_store.count(category) > 0) { + return _informer_store[category]; + } else { + IPAACA_INFO("making new informer for category " << category) + std::string scope_string = "/ipaaca/category/" + category; + Informer<AnyType>::Ptr informer = Factory::getInstance().createInformer<AnyType> ( Scope(scope_string)); + _informer_store[category] = informer; + return informer; + } +} +boost::shared_ptr<IU> OutputBuffer::remove(const std::string& iu_uid) +{ + IPAACA_IMPLEMENT_ME +} +boost::shared_ptr<IU> OutputBuffer::remove(IU::ref iu) +{ + IPAACA_IMPLEMENT_ME } /* @@ -459,6 +492,7 @@ IUConverter::IUConverter() std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire) { + IPAACA_INFO("entering") // Ensure that DATA actually holds a datum of the data-type we expect. assert(data.first == getDataType()); // "ipaaca::IU" // NOTE: a dynamic_pointer_cast cannot be used from void* @@ -487,6 +521,7 @@ std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire) } } pbo->SerializeToString(&wire); + IPAACA_INFO("leaving") return getWireSchema(); } diff --git a/cpp/src/ipaaca.h b/cpp/src/ipaaca.h index 71ae107f42ec18ee44f7de39370158b342744bda..d8c1ff77df09fc15a26f9c3e0eb884188a9ccded 100644 --- a/cpp/src/ipaaca.h +++ b/cpp/src/ipaaca.h @@ -83,6 +83,14 @@ class OutputBuffer; std::string generate_uuid_string(); +class IUStore: public std::map<std::string, boost::shared_ptr<IU> > +{ +}; +class RemotePushIUStore: public std::map<std::string, boost::shared_ptr<RemotePushIU> > // TODO genericize to all remote IU types +{ +}; + + class Lock { protected: @@ -141,22 +149,36 @@ class Buffer { //: public boost::enable_shared_from_this<Buffer> { public: virtual inline ~Buffer() { } inline const std::string& unique_name() { return _unique_name; } - _IPAACA_ABSTRACT_ virtual void add(boost::shared_ptr<IU> iu) = 0; + //_IPAACA_ABSTRACT_ virtual void add(boost::shared_ptr<IUInterface> iu) = 0; }; class OutputBuffer: public Buffer { //, public boost::enable_shared_from_this<OutputBuffer> { friend class IU; friend class RemotePushIU; protected: + std::map<std::string, Informer<AnyType>::Ptr> _informer_store; + IUStore _iu_store; + protected: + // informing functions void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef"); void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef"); void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name); + // remote access functions + // _remote_update_links(IULinkUpdate) + // _remote_update_payload(IUPayloadUpdate) + // _remote_commit(protobuf::IUCommission) + protected: + void _publish_iu(boost::shared_ptr<IU> iu); + void _retract_iu(boost::shared_ptr<IU> iu); + Informer<AnyType>::Ptr _get_informer(const std::string& category); public: OutputBuffer(const std::string& basename); ~OutputBuffer() { IPAACA_IMPLEMENT_ME } void add(boost::shared_ptr<IU> iu); + boost::shared_ptr<IU> remove(const std::string& iu_uid); + boost::shared_ptr<IU> remove(boost::shared_ptr<IU> iu); }; class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<InputBuffer> { @@ -349,12 +371,12 @@ class IU: public IUInterface {//{{{ Lock _revision_lock; protected: inline void _increase_revision_number() { _revision++; } - IU(const std::string& category="undef", IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" ); + IU(const std::string& category, IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" ); public: inline ~IU() { IPAACA_IMPLEMENT_ME } - static boost::shared_ptr<IU> create(const std::string& category="undef", IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" ); + static boost::shared_ptr<IU> create(const std::string& category, IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" ); inline Payload& payload() { return _payload; } void commit(); protected: