Skip to content
Snippets Groups Projects
Commit 20fe2f8a authored by Ramin Yaghoubzadeh's avatar Ramin Yaghoubzadeh
Browse files

More work on the c++ version

parent c22247bd
No related branches found
No related tags found
No related merge requests found
......@@ -8,7 +8,7 @@ LIBS = ${BOOSTLIBS} ${PROTOLIBS} -L/usr/local/lib -lrsc -lrsbcore
COMPILER = gfilt
all: main
true
receiver:
${COMPILER} ${CCFLAGS} -DMAKE_RECEIVER -o ipaaca-receiver ${SOURCES} ${LIBS}
......
......@@ -91,7 +91,8 @@ int main() {
initialize_ipaaca_rsb();
OutputBuffer ob;
OutputBuffer ob("TestOB");
std::cout << "Buffer: " << ob.unique_name() << std::endl;
IU::ref iu = IU::create();
ob.add(iu);
......
......@@ -124,9 +124,20 @@ void SmartLinkMap::_replace_links(const LinkMap& links)
}
//}}}
// Buffer//{{{
void Buffer::_allocate_unique_name(const std::string& basename) {
std::string uuid = ipaaca::generate_uuid_string();
std::string name = basename + "-" + uuid.substr(0,8);
_unique_name = name;
}
//}}}
// OutputBuffer//{{{
OutputBuffer::OutputBuffer(const std::string& basename)
:Buffer(basename)
{
}
void OutputBuffer::_send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
IPAACA_IMPLEMENT_ME
......@@ -143,7 +154,7 @@ void OutputBuffer::add(IU::ref iu)
{
IPAACA_IMPLEMENT_ME
// TODO place in iu store
iu->_set_buffer(this); //shared_from_this());
iu->_associate_with_buffer(this); //shared_from_this());
// TODO
}
......@@ -204,6 +215,12 @@ void OutputBuffer::add(IU::ref iu)
*/
//}}}
// InputBuffer//{{{
InputBuffer::InputBuffer(const std::string& basename)
:Buffer(basename)
{
}
//}}}
......@@ -226,6 +243,7 @@ void IUInterface::_set_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buff
throw IUAlreadyInABufferError();
}
_buffer = buffer;
}
void IUInterface::_set_owner_name(const std::string& owner_name) {
......@@ -235,6 +253,13 @@ void IUInterface::_set_owner_name(const std::string& owner_name) {
_owner_name = owner_name;
}
/// set the buffer pointer and the owner names of IU and Payload
void IUInterface::_associate_with_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) {
_set_buffer(buffer); // will throw if already set
_set_owner_name(buffer->unique_name());
payload()._set_owner_name(buffer->unique_name());
}
void IUInterface::add_links(const std::string& type, const LinkSet& targets, const std::string& writer_name)
{
LinkMap none;
......@@ -271,7 +296,7 @@ void IUInterface::set_links(const LinkMap& links, const std::string& writer_name
// IU//{{{
IU::ref IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
{
IU::ref iu = IU::ref(new IU(/* params */));
IU::ref iu = IU::ref(new IU(category, access_mode, read_only, payload_type)); /* params */ //));
iu->_payload.initialize(iu);
return iu;
}
......@@ -337,6 +362,17 @@ void IU::_internal_commit(const std::string& writer_name)
//}}}
// RemotePushIU//{{{
RemotePushIU::ref RemotePushIU::create()
{
RemotePushIU::ref iu = RemotePushIU::ref(new RemotePushIU(/* params */));
iu->_payload.initialize(iu);
return iu;
}
RemotePushIU::RemotePushIU()
{
// nothing
}
void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
IPAACA_IMPLEMENT_ME
......@@ -414,7 +450,6 @@ inline std::string Payload::get(const std::string& k) {
}
//}}}
/*
// IUConverter//{{{
IUConverter::IUConverter()
......@@ -430,17 +465,26 @@ std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire)
boost::shared_ptr<const IU> obj = boost::static_pointer_cast<const IU> (data.second);
boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
// transfer obj data to pbo
pbo->set_uid(obj->uid);
pbo->set_revision(obj->revision);
pbo->set_writer_name(obj->writer_name);
pbo->set_is_delta(obj->is_delta);
for (std::map<std::string, std::string>::const_iterator it=obj->new_items.begin(); it!=obj->new_items.end(); ++it) {
protobuf::PayloadItem* item = pbo->add_new_items();
pbo->set_uid(obj->uid());
pbo->set_revision(obj->revision());
pbo->set_category(obj->category());
pbo->set_payload_type(obj->payload_type());
pbo->set_owner_name(obj->owner_name());
pbo->set_committed(obj->committed());
pbo->set_access_mode(ipaaca::protobuf::IU::PUSH); // TODO
pbo->set_read_only(obj->read_only());
for (std::map<std::string, std::string>::const_iterator it=obj->_payload._store.begin(); it!=obj->_payload._store.end(); ++it) {
protobuf::PayloadItem* item = pbo->add_payload();
item->set_key(it->first);
item->set_value(it->second);
item->set_type("str"); // FIXME other types than str (later)
}
for (std::vector<std::string>::const_iterator it=obj->keys_to_remove.begin(); it!=obj->keys_to_remove.end(); ++it) {
pbo->add_keys_to_remove(*it);
for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) {
protobuf::LinkSet* links = pbo->add_links();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
pbo->SerializeToString(&wire);
return getWireSchema();
......@@ -448,27 +492,45 @@ std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire)
}
AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "ipaaca-iu-payload-update"
assert(wireSchema == getWireSchema()); // "ipaaca-iu"
boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
pbo->ParseFromString(wire);
boost::shared_ptr<IU> obj(new IU());
// transfer pbo data to obj
obj->uid = pbo->uid();
obj->revision = pbo->revision();
obj->writer_name = pbo->writer_name();
obj->is_delta = pbo->is_delta();
for (int i=0; i<pbo->new_items_size(); i++) {
const protobuf::PayloadItem& it = pbo->new_items(i);
obj->new_items[it.key()] = it.value();
}
for (int i=0; i<pbo->keys_to_remove_size(); i++) {
obj->keys_to_remove.push_back(pbo->keys_to_remove(i));
IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode());
switch(mode) {
case IU_ACCESS_PUSH:
{
// Create a "remote push IU"
boost::shared_ptr<RemotePushIU> obj(new RemotePushIU());
// transfer pbo data to obj
obj->_uid = pbo->uid();
obj->_revision = pbo->revision();
obj->_category = pbo->category();
obj->_payload_type = pbo->payload_type();
obj->_owner_name = pbo->owner_name();
obj->_committed = pbo->committed();
obj->_read_only = pbo->read_only();
obj->_access_mode = IU_ACCESS_PUSH;
for (int i=0; i<pbo->payload_size(); i++) {
const protobuf::PayloadItem& it = pbo->payload(i);
obj->_payload._store[it.key()] = it.value();
}
for (int i=0; i<pbo->links_size(); i++) {
const protobuf::LinkSet& pls = pbo->links(i);
LinkSet& ls = obj->_links._links[pls.type()];
for (int j=0; j<pls.targets_size(); j++) {
ls.insert(pls.targets(j));
}
}
return std::make_pair(getDataType(), obj);
break;
}
default:
// other cases not handled yet! ( TODO )
throw NotImplementedError();
}
return std::make_pair(getDataType(), obj);
}
//}}}
*/
// IUPayloadUpdateConverter//{{{
......
......@@ -45,18 +45,18 @@ namespace ipaaca {
typedef uint32_t revision_t;
enum IUEventType {
ADDED,
COMMITTED,
DELETED,
RETRACTED,
UPDATED,
LINKSUPDATED
IU_ADDED,
IU_COMMITTED,
IU_DELETED,
IU_RETRACTED,
IU_UPDATED,
IU_LINKSUPDATED
};
enum IUAccessMode {
PUSH,
REMOTE,
MESSAGE
IU_ACCESS_PUSH,
IU_ACCESS_REMOTE,
IU_ACCESS_MESSAGE
};
//class {
......@@ -109,11 +109,13 @@ class Lock
typedef std::set<std::string> LinkSet;
typedef std::map<std::string, LinkSet> LinkMap;
class SmartLinkMap {
friend class IUInterface;
friend class IU;
friend class IUConverter;
public:
const LinkSet& get_links(const std::string& key);
const LinkMap& get_all_links();
friend class IUInterface;
protected:
LinkMap _links;
void _add_and_remove_links(const LinkMap& add, const LinkMap& remove);
......@@ -126,14 +128,37 @@ const LinkSet EMPTY_LINK_SET;
class Buffer { //: public boost::enable_shared_from_this<Buffer> {
friend class IU;
friend class RemotePushIU;
protected:
std::string _unique_name;
protected:
_IPAACA_ABSTRACT_ virtual void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef") = 0;
_IPAACA_ABSTRACT_ virtual void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef") = 0;
_IPAACA_ABSTRACT_ virtual void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name="undef") = 0;
void _allocate_unique_name(const std::string& basename);
inline Buffer(const std::string& basename) {
_allocate_unique_name(basename);
}
public:
virtual inline ~Buffer() { }
inline const std::string& unique_name() { return _unique_name; }
_IPAACA_ABSTRACT_ virtual void add(boost::shared_ptr<IU> iu) = 0;
};
class OutputBuffer: public Buffer { //, public boost::enable_shared_from_this<OutputBuffer> {
friend class IU;
friend class RemotePushIU;
protected:
void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef");
void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef");
void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name);
public:
OutputBuffer(const std::string& basename);
~OutputBuffer() {
IPAACA_IMPLEMENT_ME
}
void add(boost::shared_ptr<IU> iu);
};
class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<InputBuffer> {
friend class IU;
friend class RemotePushIU;
......@@ -151,23 +176,16 @@ class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<Inp
IPAACA_INFO("(ERROR) InputBuffer::_send_iu_commission() should never be invoked")
}
public:
InputBuffer(const std::string& basename);
~InputBuffer() {
IPAACA_IMPLEMENT_ME
}
inline void add(boost::shared_ptr<IU> iu)
{
IPAACA_IMPLEMENT_ME
}
};
class OutputBuffer: public Buffer { //, public boost::enable_shared_from_this<OutputBuffer> {
friend class IU;
friend class RemotePushIU;
protected:
void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef");
void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef");
void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name);
public:
void add(boost::shared_ptr<IU> iu);
};
/*
class IUEventFunctionHandler: public rsb::EventFunctionHandler {
protected:
......@@ -178,6 +196,13 @@ class IUEventFunctionHandler: public rsb::EventFunctionHandler {
};
*/
class IUConverter: public rsb::converter::Converter<std::string> {//{{{
public:
IUConverter();
std::string serialize(const rsb::AnnotatedData& data, std::string& wire);
rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire);
};//}}}
class IUPayloadUpdate {//{{{
public:
std::string uid;
......@@ -232,14 +257,20 @@ class PayloadEntryProxy//{{{
class Payload//{{{
{
friend class IUInterface;
friend class IU;
friend class RemotePushIU;
friend class IUConverter;
protected:
std::string _owner_name;
std::map<std::string, std::string> _store;
boost::shared_ptr<IUInterface> _iu;
protected:
friend class IU;
friend class RemotePushIU;
void initialize(boost::shared_ptr<IUInterface> iu);
inline void _set_owner_name(const std::string& name) { _owner_name = name; }
public:
inline const std::string& owner_name() { return _owner_name; }
// access
PayloadEntryProxy operator[](const std::string& key);
void set(const std::string& k, const std::string& v);
void remove(const std::string& k);
......@@ -248,6 +279,7 @@ class Payload//{{{
};//}}}
class IUInterface {//{{{
friend class IUConverter;
protected:
IUInterface();
public:
......@@ -271,6 +303,7 @@ class IUInterface {//{{{
_IPAACA_ABSTRACT_ virtual void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) = 0;
_IPAACA_ABSTRACT_ virtual void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) = 0;
//void _set_buffer(boost::shared_ptr<Buffer> buffer);
void _associate_with_buffer(Buffer* buffer);
void _set_buffer(Buffer* buffer);
void _set_uid(const std::string& uid);
void _set_owner_name(const std::string& owner_name);
......@@ -280,16 +313,16 @@ class IUInterface {//{{{
inline void _replace_links(const LinkMap& links) { _links._replace_links(links); }
public:
inline bool is_published() { return (_buffer != 0); }
inline const std::string& uid() { return _uid; }
inline revision_t revision() { return _revision; }
inline const std::string& category() { return _category; }
inline const std::string& payload_type() { return _payload_type; }
inline const std::string& owner_name() { return _owner_name; }
inline bool committed() { return _committed; }
inline IUAccessMode access_mode() { return _access_mode; }
inline bool read_only() { return _read_only; }
inline const std::string& uid() const { return _uid; }
inline revision_t revision() const { return _revision; }
inline const std::string& category() const { return _category; }
inline const std::string& payload_type() const { return _payload_type; }
inline const std::string& owner_name() const { return _owner_name; }
inline bool committed() const { return _committed; }
inline IUAccessMode access_mode() const { return _access_mode; }
inline bool read_only() const { return _read_only; }
//inline boost::shared_ptr<Buffer> buffer() { return _buffer; }
inline Buffer* buffer() { return _buffer; }
inline Buffer* buffer() const { return _buffer; }
inline const LinkSet& get_links(std::string type) { return _links.get_links(type); }
inline const LinkMap& get_all_links() { return _links.get_all_links(); }
// Payload
......@@ -316,12 +349,12 @@ class IU: public IUInterface {//{{{
Lock _revision_lock;
protected:
inline void _increase_revision_number() { _revision++; }
IU(const std::string& category="undef", IUAccessMode access_mode=PUSH, bool read_only=false, const std::string& payload_type="MAP" );
IU(const std::string& category="undef", IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" );
public:
inline ~IU() {
IPAACA_IMPLEMENT_ME
}
static boost::shared_ptr<IU> create(const std::string& category="undef", IUAccessMode access_mode=PUSH, bool read_only=false, const std::string& payload_type="MAP" );
static boost::shared_ptr<IU> create(const std::string& category="undef", IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" );
inline Payload& payload() { return _payload; }
void commit();
protected:
......@@ -337,16 +370,22 @@ class RemotePushIU: public IUInterface {//{{{
friend class Buffer;
friend class InputBuffer;
friend class OutputBuffer;
friend class IUConverter;
public:
Payload _payload;
protected:
//RemotePushIU();
RemotePushIU();
static boost::shared_ptr<RemotePushIU> create();
public:
inline ~RemotePushIU() {
IPAACA_IMPLEMENT_ME
}
inline Payload& payload() { return _payload; }
void commit();
protected:
void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = "");
void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name = "");
typedef boost::shared_ptr<RemotePushIU> ref;
};//}}}
class Exception: public std::exception//{{{
......@@ -400,6 +439,14 @@ class IUAlreadyHasAnOwnerNameError: public Exception//{{{
_description = "IUAlreadyHasAnOwnerNameError";
}
};//}}}
class NotImplementedError: public Exception//{{{
{
public:
inline ~NotImplementedError() throw() { }
inline NotImplementedError() { //boost::shared_ptr<IU> iu) {
_description = "NotImplementedError";
}
};//}}}
} // of namespace ipaaca
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment