Skip to content
Snippets Groups Projects
Commit a1578510 authored by Ramin Yaghoubzadeh's avatar Ramin Yaghoubzadeh
Browse files

Much more work on c++ version, some things actually work now :)

parent 6a0076cd
No related branches found
No related tags found
No related merge requests found
......@@ -7,7 +7,7 @@ LIBS = ${BOOSTLIBS} ${PROTOLIBS} -L/usr/local/lib -lrsc -lrsbcore
COMPILER = gfilt
all: main
all: receiver sender
receiver:
......@@ -23,6 +23,6 @@ protoc:
protoc --proto_path=../../proto ../../proto/ipaaca.proto --cpp_out=.
clean:
rm -f ipaaca-test ipaaca-sender ipaaca-receiver ipaaca.pb.h ipaaca.pb.cc
rm -f ipaaca-main ipaaca-sender ipaaca-receiver ipaaca.pb.h ipaaca.pb.cc
......@@ -5,7 +5,7 @@
//#include <rsc/logging/LoggerFactory.h>
// //rsc::logging::LoggerFactory::getInstance().reconfigure(rsc::logging::Logger::LEVEL_ALL);
#ifdef MAKE_RECEIVER
#if 0
//boost::mutex mtx;
using namespace ipaaca;
......@@ -42,8 +42,11 @@ int main() {
}
return EXIT_SUCCESS;
}
#else
#ifdef MAKE_SENDER
//
//
//
//
using namespace ipaaca;
int main() {
initialize_ipaaca_rsb();
......@@ -78,7 +81,7 @@ int main() {
std::cout << "Done." << std::endl;
return EXIT_SUCCESS;
}
#else
#endif
//
// TESTS
......@@ -86,6 +89,24 @@ int main() {
using namespace ipaaca;
#ifdef MAKE_RECEIVER
int main() {
try{
initialize_ipaaca_rsb();
InputBuffer ib("TestIB", "testcategory");
while (true) {
sleep(1);
}
} catch (ipaaca::Exception& e) {
std::cout << "== IPAACA EXCEPTION == " << e.what() << std::endl;
}
}
#else
#ifdef MAKE_SENDER
int main() {
try{
initialize_ipaaca_rsb();
......
......@@ -47,6 +47,29 @@ void init_inprocess_too() {
*/
//}}}
std::ostream& operator<<(std::ostream& os, const Payload& obj)//{{{
{
os << "{";
bool first = true;
for (std::map<std::string, std::string>::const_iterator it=obj._store.begin(); it!=obj._store.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << it->first << "':'" << it->second << "'";
}
os << "}";
return os;
}
//}}}
std::ostream& operator<<(std::ostream& os, const IUInterface& obj)//{{{
{
os << "IUInterface(uid=" << obj.uid() << ", revision=" << obj.revision();
os << ", owner_name=" << obj.owner_name();
os << ", payload = ";
bool first = true;
os << obj.const_payload();
os << ")";
return os;
}
//}}}
std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj)//{{{
{
os << "PayloadUpdate(uid=" << obj.uid << ", revision=" << obj.revision;
......@@ -284,10 +307,179 @@ boost::shared_ptr<IU> OutputBuffer::remove(IU::ref iu)
//}}}
// InputBuffer//{{{
InputBuffer::InputBuffer(const std::string& basename)
:Buffer(basename)
InputBuffer::InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests)
:Buffer(basename, "IB")
{
for (std::vector<std::string>::const_iterator it=category_interests.begin(); it!=category_interests.end(); ++it) {
_create_category_listener_if_needed(*it);
}
}
InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1)
:Buffer(basename, "IB")
{
_create_category_listener_if_needed(category_interest1);
}
InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2)
:Buffer(basename, "IB")
{
_create_category_listener_if_needed(category_interest1);
_create_category_listener_if_needed(category_interest2);
}
InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3)
:Buffer(basename, "IB")
{
_create_category_listener_if_needed(category_interest1);
_create_category_listener_if_needed(category_interest2);
_create_category_listener_if_needed(category_interest3);
}
InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4)
:Buffer(basename, "IB")
{
_create_category_listener_if_needed(category_interest1);
_create_category_listener_if_needed(category_interest2);
_create_category_listener_if_needed(category_interest3);
_create_category_listener_if_needed(category_interest4);
}
RemoteServerPtr InputBuffer::_get_remote_server(boost::shared_ptr<IU> iu)
{
IPAACA_IMPLEMENT_ME
return RemoteServerPtr();
}
ListenerPtr InputBuffer::_create_category_listener_if_needed(const std::string& category)
{
IPAACA_INFO("entering")
std::map<std::string, ListenerPtr>::iterator it = _listener_store.find(category);
if (it!=_listener_store.end()) return it->second;
IPAACA_INFO("creating a new listener")
std::string scope_string = "/ipaaca/category/" + category;
ListenerPtr listener = Factory::getInstance().createListener( Scope(scope_string) );
HandlerPtr event_handler = HandlerPtr(
new EventFunctionHandler(
boost::bind(&InputBuffer::_handle_iu_events, this, _1)
)
);
listener->addHandler(event_handler);
_listener_store[category] = listener;
IPAACA_INFO("done")
return listener;
/*
'''Return (or create, store and return) a category listener.'''
if iu_category in self._listener_store: return self._informer_store[iu_category]
cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
cat_listener.addHandler(self._handle_iu_events)
self._listener_store[iu_category] = cat_listener
self._category_interests.append(iu_category)
logger.info("Added listener in scope "+"/ipaaca/category/"+iu_category)
return cat_listener
*/
}
void InputBuffer::call_iu_event_handlers(const std::string& uid, bool local, IUEventType event_type, const std::string& category)
{
IPAACA_INFO("handling an event " << ipaaca::iu_event_type_to_str(event_type) << " for IU " << uid)
}
void InputBuffer::_handle_iu_events(EventPtr event)
{
std::string type = event->getType();
if (type == "ipaaca::RemotePushIU") {
boost::shared_ptr<RemotePushIU> iu = boost::static_pointer_cast<RemotePushIU>(event->getData());
if (_iu_store.count(iu->category()) > 0) {
// already got the IU... ignore
} else {
_iu_store[iu->uid()] = iu;
iu->_set_buffer(this);
call_iu_event_handlers(iu->uid(), false, IU_ADDED, iu->category() );
}
IPAACA_INFO( "New RemotePushIU state: " << (*iu) )
} else {
RemotePushIUStore::iterator it;
if (type == "ipaaca::IUPayloadUpdate") {
boost::shared_ptr<IUPayloadUpdate> update = boost::static_pointer_cast<IUPayloadUpdate>(event->getData());
if (update->writer_name == _unique_name) {
//IPAACA_INFO("Ignoring locally-written IU update")
return;
}
it = _iu_store.find(update->uid);
if (it == _iu_store.end()) {
IPAACA_INFO("Ignoring UPDATED message for an IU that we did not fully receive before")
return;
}
//
it->second->_apply_update(update);
call_iu_event_handlers(it->second->uid(), false, IU_UPDATED, it->second->category() );
//
//
} else if (type == "ipaaca::IULinkUpdate") {
boost::shared_ptr<IULinkUpdate> update = boost::static_pointer_cast<IULinkUpdate>(event->getData());
if (update->writer_name == _unique_name) {
//IPAACA_INFO("Ignoring locally-written IU update")
return;
}
it = _iu_store.find(update->uid);
if (it == _iu_store.end()) {
IPAACA_INFO("Ignoring LINKSUPDATED message for an IU that we did not fully receive before")
return;
}
//
it->second->_apply_link_update(update);
call_iu_event_handlers(it->second->uid(), false, IU_LINKSUPDATED, it->second->category() );
//
//
} else if (type == "ipaaca::protobuf::IUCommission") {
boost::shared_ptr<protobuf::IUCommission> update = boost::static_pointer_cast<protobuf::IUCommission>(event->getData());
if (update->writer_name() == _unique_name) {
//IPAACA_INFO("Ignoring locally-written IU commit")
return;
}
it = _iu_store.find(update->uid());
if (it == _iu_store.end()) {
IPAACA_INFO("Ignoring COMMITTED message for an IU that we did not fully receive before")
return;
}
//
it->second->_apply_commission();
it->second->_revision = update->revision();
call_iu_event_handlers(it->second->uid(), false, IU_COMMITTED, it->second->category() );
//
//
} else {
std::cout << "(Unhandled Event type " << type << " !)" << std::endl;
return;
}
IPAACA_INFO( "New RemotePushIU state: " << *(it->second) )
}
/*
else:
# an update to an existing IU
if event.data.writer_name == self.unique_name:
# Discard updates that originate from this buffer
return
if event.data.uid not in self._iu_store:
# TODO: we should request the IU's owner to send us the IU
logger.warning("Update message for IU which we did not fully receive before.")
return
if type_ is ipaaca_pb2.IUCommission:
# IU commit
iu = self._iu_store[event.data.uid]
iu._apply_commission()
iu._revision = event.data.revision
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category)
elif type_ is IUPayloadUpdate:
# IU payload update
iu = self._iu_store[event.data.uid]
iu._apply_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category)
elif type_ is IULinkUpdate:
# IU link update
iu = self._iu_store[event.data.uid]
iu._apply_link_update(event.data)
self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.LINKSUPDATED, category=iu.category)
else:
logger.warning('Warning: _handle_iu_events failed to handle an object of type '+str(type_))
*/
}
//}}}
......@@ -454,6 +646,48 @@ void RemotePushIU::commit()
{
IPAACA_IMPLEMENT_ME
}
void RemotePushIU::_apply_link_update(IULinkUpdate::ptr update)
{
_revision = update->revision;
if (update->is_delta) {
_add_and_remove_links(update->new_links, update->links_to_remove);
} else {
_replace_links(update->new_links);
}
}
void RemotePushIU::_apply_update(IUPayloadUpdate::ptr update)
{
_revision = update->revision;
if (update->is_delta) {
for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) {
_payload._remotely_enforced_delitem(*it);
}
for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
_payload._remotely_enforced_setitem(it->first, it->second);
}
} else {
_payload._remotely_enforced_wipe();
for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
_payload._remotely_enforced_setitem(it->first, it->second);
}
}
}
void RemotePushIU::_apply_commission()
{
IPAACA_IMPLEMENT_ME
}
void Payload::_remotely_enforced_wipe()
{
_store.clear();
}
void Payload::_remotely_enforced_delitem(const std::string& k)
{
_store.erase(k);
}
void Payload::_remotely_enforced_setitem(const std::string& k, const std::string& v)
{
_store[k] = v;
}
//}}}
......@@ -591,7 +825,8 @@ AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std:
ls.insert(pls.targets(j));
}
}
return std::make_pair(getDataType(), obj);
//return std::make_pair(getDataType(), obj);
return std::make_pair("ipaaca::RemotePushIU", obj);
break;
}
default:
......
......@@ -38,7 +38,9 @@
//using namespace boost;
using namespace rsb;
using namespace rsb::filter;
using namespace rsb::converter;
using namespace rsb::patterns;
namespace ipaaca {
......@@ -52,6 +54,18 @@ enum IUEventType {
IU_UPDATED,
IU_LINKSUPDATED
};
inline std::string iu_event_type_to_str(IUEventType type)
{
switch(type) {
case IU_ADDED: return "ADDED";
case IU_COMMITTED: return "COMMITTED";
case IU_DELETED: return "DELETED";
case IU_RETRACTED: return "RETRACTED";
case IU_UPDATED: return "UPDATED";
case IU_LINKSUPDATED: return "LINKSUPDATED";
default: return "(IU_EVENT_TYPE_UNKNOWN)";
}
}
enum IUAccessMode {
IU_ACCESS_PUSH,
......@@ -140,11 +154,12 @@ class Buffer { //: public boost::enable_shared_from_this<Buffer> {
std::string _uuid;
std::string _basename;
std::string _unique_name;
std::string _id_prefix;
protected:
_IPAACA_ABSTRACT_ virtual void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef") = 0;
_IPAACA_ABSTRACT_ virtual void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef") = 0;
_IPAACA_ABSTRACT_ virtual void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name="undef") = 0;
void _allocate_unique_name(const std::string& basename);
void _allocate_unique_name(const std::string& basename, const std::string& function);
inline Buffer(const std::string& basename, const std::string& function) {
_allocate_unique_name(basename, function);
}
......@@ -187,6 +202,9 @@ class OutputBuffer: public Buffer { //, public boost::enable_shared_from_this<Ou
class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<InputBuffer> {
friend class IU;
friend class RemotePushIU;
protected:
std::map<std::string, ListenerPtr> _listener_store;
RemotePushIUStore _iu_store; // TODO genericize
protected:
inline void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef")
{
......@@ -200,15 +218,24 @@ class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<Inp
{
IPAACA_INFO("(ERROR) InputBuffer::_send_iu_commission() should never be invoked")
}
protected:
RemoteServerPtr _get_remote_server(boost::shared_ptr<IU> iu);
ListenerPtr _create_category_listener_if_needed(const std::string& category);
void _handle_iu_events(EventPtr event);
void call_iu_event_handlers(const std::string& uid, bool local, IUEventType event_type, const std::string& category);
public:
InputBuffer(const std::string& basename);
InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests);
InputBuffer(const std::string& basename, const std::string& category_interest1);
InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2);
InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3);
InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4);
~InputBuffer() {
IPAACA_IMPLEMENT_ME
}
inline void add(boost::shared_ptr<IU> iu)
{
IPAACA_IMPLEMENT_ME
}
//inline void add(boost::shared_ptr<IU> iu)
//{
// IPAACA_IMPLEMENT_ME
//}
};
/*
......@@ -237,6 +264,7 @@ class IUPayloadUpdate {//{{{
std::map<std::string, std::string> new_items;
std::vector<std::string> keys_to_remove;
friend std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj);
typedef boost::shared_ptr<IUPayloadUpdate> ptr;
};//}}}
class IUPayloadUpdateConverter: public rsb::converter::Converter<std::string> {//{{{
public:
......@@ -254,6 +282,7 @@ class IULinkUpdate {//{{{
std::map<std::string, std::set<std::string> > new_links;
std::map<std::string, std::set<std::string> > links_to_remove;
friend std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj);
typedef boost::shared_ptr<IULinkUpdate> ptr;
};//}}}
class IULinkUpdateConverter: public rsb::converter::Converter<std::string> {//{{{
public:
......@@ -282,6 +311,7 @@ class PayloadEntryProxy//{{{
class Payload//{{{
{
friend std::ostream& operator<<(std::ostream& os, const Payload& obj);
friend class IUInterface;
friend class IU;
friend class RemotePushIU;
......@@ -293,6 +323,9 @@ class Payload//{{{
protected:
void initialize(boost::shared_ptr<IUInterface> iu);
inline void _set_owner_name(const std::string& name) { _owner_name = name; }
void _remotely_enforced_wipe();
void _remotely_enforced_delitem(const std::string& k);
void _remotely_enforced_setitem(const std::string& k, const std::string& v);
public:
inline const std::string& owner_name() { return _owner_name; }
// access
......@@ -305,6 +338,7 @@ class Payload//{{{
class IUInterface {//{{{
friend class IUConverter;
friend std::ostream& operator<<(std::ostream& os, const IUInterface& obj);
protected:
IUInterface();
public:
......@@ -352,6 +386,7 @@ class IUInterface {//{{{
inline const LinkMap& get_all_links() { return _links.get_all_links(); }
// Payload
_IPAACA_ABSTRACT_ virtual Payload& payload() = 0;
_IPAACA_ABSTRACT_ virtual const Payload& const_payload() const = 0;
// setters
_IPAACA_ABSTRACT_ virtual void commit() = 0;
// functions to modify and update links:
......@@ -381,6 +416,7 @@ class IU: public IUInterface {//{{{
}
static boost::shared_ptr<IU> create(const std::string& category, IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" );
inline Payload& payload() { return _payload; }
inline const Payload& const_payload() const { return _payload; }
void commit();
protected:
void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = "");
......@@ -406,10 +442,15 @@ class RemotePushIU: public IUInterface {//{{{
IPAACA_IMPLEMENT_ME
}
inline Payload& payload() { return _payload; }
inline const Payload& const_payload() const { return _payload; }
void commit();
protected:
void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = "");
void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name = "");
protected:
void _apply_update(IUPayloadUpdate::ptr update);
void _apply_link_update(IULinkUpdate::ptr update);
void _apply_commission();
typedef boost::shared_ptr<RemotePushIU> ref;
};//}}}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment