Skip to content
Snippets Groups Projects
Commit 8dc0b0e4 authored by Ramin Yaghoubzadeh's avatar Ramin Yaghoubzadeh
Browse files

More work on c++ version

parent 20fe2f8a
No related branches found
No related tags found
No related merge requests found
......@@ -94,7 +94,7 @@ int main() {
OutputBuffer ob("TestOB");
std::cout << "Buffer: " << ob.unique_name() << std::endl;
IU::ref iu = IU::create();
IU::ref iu = IU::create("testcategory");
ob.add(iu);
std::cout << "_payload.get(\"TEST\") = \"" << iu->_payload.get("TEST") << "\"" << std::endl;
......
......@@ -21,8 +21,10 @@ void initialize_ipaaca_rsb()
ParticipantConfig config = ParticipantConfig::fromConfiguration();
Factory::getInstance().setDefaultParticipantConfig(config);
boost::shared_ptr<IUConverter> iu_converter(new IUConverter());
boost::shared_ptr<IUPayloadUpdateConverter> payload_update_converter(new IUPayloadUpdateConverter());
boost::shared_ptr<IULinkUpdateConverter> link_update_converter(new IULinkUpdateConverter());
stringConverterRepository()->registerConverter(iu_converter);
stringConverterRepository()->registerConverter(payload_update_converter);
stringConverterRepository()->registerConverter(link_update_converter);
......@@ -152,10 +154,41 @@ void OutputBuffer::_send_iu_commission(IUInterface* iu, revision_t revision, con
}
void OutputBuffer::add(IU::ref iu)
{
IPAACA_IMPLEMENT_ME
// TODO place in iu store
//IPAACA_IMPLEMENT_ME
if (_iu_store.count(iu->uid()) > 0) {
throw IUPublishedError();
}
_iu_store[iu->uid()] = iu;
iu->_associate_with_buffer(this); //shared_from_this());
// TODO
_publish_iu(iu);
}
void OutputBuffer::_publish_iu(IU::ref iu)
{
Informer<AnyType>::Ptr informer = _get_informer(iu->_category);
Informer<ipaaca::IU>::DataPtr iu_data(iu);
informer->publish(iu_data);
}
Informer<AnyType>::Ptr OutputBuffer::_get_informer(const std::string& category)
{
if (_informer_store.count(category) > 0) {
return _informer_store[category];
} else {
IPAACA_INFO("making new informer for category " << category)
std::string scope_string = "/ipaaca/category/" + category;
Informer<AnyType>::Ptr informer = Factory::getInstance().createInformer<AnyType> ( Scope(scope_string));
_informer_store[category] = informer;
return informer;
}
}
boost::shared_ptr<IU> OutputBuffer::remove(const std::string& iu_uid)
{
IPAACA_IMPLEMENT_ME
}
boost::shared_ptr<IU> OutputBuffer::remove(IU::ref iu)
{
IPAACA_IMPLEMENT_ME
}
/*
......@@ -459,6 +492,7 @@ IUConverter::IUConverter()
std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire)
{
IPAACA_INFO("entering")
// Ensure that DATA actually holds a datum of the data-type we expect.
assert(data.first == getDataType()); // "ipaaca::IU"
// NOTE: a dynamic_pointer_cast cannot be used from void*
......@@ -487,6 +521,7 @@ std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire)
}
}
pbo->SerializeToString(&wire);
IPAACA_INFO("leaving")
return getWireSchema();
}
......
......@@ -83,6 +83,14 @@ class OutputBuffer;
std::string generate_uuid_string();
class IUStore: public std::map<std::string, boost::shared_ptr<IU> >
{
};
class RemotePushIUStore: public std::map<std::string, boost::shared_ptr<RemotePushIU> > // TODO genericize to all remote IU types
{
};
class Lock
{
protected:
......@@ -141,22 +149,36 @@ class Buffer { //: public boost::enable_shared_from_this<Buffer> {
public:
virtual inline ~Buffer() { }
inline const std::string& unique_name() { return _unique_name; }
_IPAACA_ABSTRACT_ virtual void add(boost::shared_ptr<IU> iu) = 0;
//_IPAACA_ABSTRACT_ virtual void add(boost::shared_ptr<IUInterface> iu) = 0;
};
class OutputBuffer: public Buffer { //, public boost::enable_shared_from_this<OutputBuffer> {
friend class IU;
friend class RemotePushIU;
protected:
std::map<std::string, Informer<AnyType>::Ptr> _informer_store;
IUStore _iu_store;
protected:
// informing functions
void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef");
void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef");
void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name);
// remote access functions
// _remote_update_links(IULinkUpdate)
// _remote_update_payload(IUPayloadUpdate)
// _remote_commit(protobuf::IUCommission)
protected:
void _publish_iu(boost::shared_ptr<IU> iu);
void _retract_iu(boost::shared_ptr<IU> iu);
Informer<AnyType>::Ptr _get_informer(const std::string& category);
public:
OutputBuffer(const std::string& basename);
~OutputBuffer() {
IPAACA_IMPLEMENT_ME
}
void add(boost::shared_ptr<IU> iu);
boost::shared_ptr<IU> remove(const std::string& iu_uid);
boost::shared_ptr<IU> remove(boost::shared_ptr<IU> iu);
};
class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<InputBuffer> {
......@@ -349,12 +371,12 @@ class IU: public IUInterface {//{{{
Lock _revision_lock;
protected:
inline void _increase_revision_number() { _revision++; }
IU(const std::string& category="undef", IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" );
IU(const std::string& category, IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" );
public:
inline ~IU() {
IPAACA_IMPLEMENT_ME
}
static boost::shared_ptr<IU> create(const std::string& category="undef", IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" );
static boost::shared_ptr<IU> create(const std::string& category, IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" );
inline Payload& payload() { return _payload; }
void commit();
protected:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment