Newer
Older
#include <ipaaca/ipaaca.h>
#include <cstdlib>
namespace ipaaca {
using namespace rsb;
using namespace rsb::filter;
using namespace rsb::converter;
using namespace rsb::patterns;
#define VERBOSE_HANDLERS 0
Lock& logger_lock() {
static Lock lock;
return lock;
}
bool Initializer::_initialized = false;
//const LinkSet EMPTY_LINK_SET = LinkSet();
//const std::set<std::string> EMPTY_LINK_SET();
bool Initializer::initialized() { return _initialized; }
void Initializer::initialize_ipaaca_rsb_if_needed()
if (_initialized) return;
// RYT FIXME This configuration stuff has been simply removed in rsb!
//ParticipantConfig config = ParticipantConfig::fromConfiguration();
//getFactory().setDefaultParticipantConfig(config);
boost::shared_ptr<IUConverter> iu_converter(new IUConverter());
converterRepository<std::string>()->registerConverter(iu_converter);

Ramin Yaghoubzadeh
committed
boost::shared_ptr<MessageConverter> message_converter(new MessageConverter());
converterRepository<std::string>()->registerConverter(message_converter);
boost::shared_ptr<IUPayloadUpdateConverter> payload_update_converter(new IUPayloadUpdateConverter());
converterRepository<std::string>()->registerConverter(payload_update_converter);
boost::shared_ptr<IULinkUpdateConverter> link_update_converter(new IULinkUpdateConverter());
converterRepository<std::string>()->registerConverter(link_update_converter);
boost::shared_ptr<ProtocolBufferConverter<protobuf::IUCommission> > iu_commission_converter(new ProtocolBufferConverter<protobuf::IUCommission> ());
converterRepository<std::string>()->registerConverter(iu_commission_converter);
boost::shared_ptr<ProtocolBufferConverter<protobuf::IURetraction> > iu_retraction_converter(new ProtocolBufferConverter<protobuf::IURetraction> ());
converterRepository<std::string>()->registerConverter(iu_retraction_converter);
boost::shared_ptr<IntConverter> int_converter(new IntConverter());
converterRepository<std::string>()->registerConverter(int_converter);
_initialized = true;
//IPAACA_TODO("initialize all converters")
}
std::string generate_uuid_string()
{
uuid_t uuidt;
uuid_generate(uuidt);
#ifdef __MACOSX__
uuid_string_t uuidstr;
uuid_unparse_lower(uuidt, uuidstr);
return uuidstr;
#else
char result_c[37];
uuid_unparse_lower(uuidt, result_c);
return result_c;
#endif
//ParticipantConfig config = getFactory().getDefaultParticipantConfig();
ParticipantConfig config = ParticipantConfig::fromFile("rsb.cfg");
//ParticipantConfig::Transport inprocess = config.getTransport("inprocess");
//inprocess.setEnabled(true);
//config.addTransport(inprocess);
getFactory().setDefaultParticipantConfig(config);
std::ostream& operator<<(std::ostream& os, const SmartLinkMap& obj)//{{{
{
os << "{";
bool first = true;
for (LinkMap::const_iterator it=obj._links.begin(); it!=obj._links.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << it->first << "': [";
bool firstinner = true;
for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
if (firstinner) { firstinner=false; } else { os << ", "; }
os << "'" << *it2 << "'";
}
os << "]";
}
os << "}";
return os;
}
//}}}
std::ostream& operator<<(std::ostream& os, const Payload& obj)//{{{
{
os << "{";
bool first = true;
for (std::map<std::string, std::string>::const_iterator it=obj._store.begin(); it!=obj._store.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << it->first << "':'" << it->second << "'";
}
os << "}";
return os;
}
//}}}
std::ostream& operator<<(std::ostream& os, const IUInterface& obj)//{{{
{
os << "IUInterface(uid='" << obj.uid() << "'";
os << ", category='" << obj.category() << "'";
os << ", revision=" << obj.revision();
os << ", committed=" << (obj.committed()?"True":"False");
os << ", owner_name='" << obj.owner_name() << "'";
os << ", payload=";
os << obj.const_payload();
os << ", links=";
os << obj._links;
os << ")";
return os;
}
//}}}
std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj)//{{{
{
os << "PayloadUpdate(uid=" << obj.uid << ", revision=" << obj.revision;
os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False");
os << ", new_items = {";
bool first = true;
for (std::map<std::string, std::string>::const_iterator it=obj.new_items.begin(); it!=obj.new_items.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << it->first << "':'" << it->second << "'";
}
os << "}, keys_to_remove = [";
first = true;
for (std::vector<std::string>::const_iterator it=obj.keys_to_remove.begin(); it!=obj.keys_to_remove.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << *it << "'";
}
os << "])";
return os;
}
//}}}
std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj)//{{{
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
{
os << "LinkUpdate(uid=" << obj.uid << ", revision=" << obj.revision;
os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False");
os << ", new_links = {";
bool first = true;
for (std::map<std::string, std::set<std::string> >::const_iterator it=obj.new_links.begin(); it!=obj.new_links.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << it->first << "': [";
bool ffirst = true;
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
if (ffirst) { ffirst=false; } else { os << ", "; }
os << "'" << *it2 << "'";
}
os << "]";
}
os << "}, links_to_remove = {";
first = true;
for (std::map<std::string, std::set<std::string> >::const_iterator it=obj.links_to_remove.begin(); it!=obj.links_to_remove.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << it->first << "': [";
bool ffirst = true;
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
if (ffirst) { ffirst=false; } else { os << ", "; }
os << "'" << *it2 << "'";
}
os << "]";
}
os << "})";
return os;
}
LinkSet SmartLinkMap::empty_link_set;
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
void SmartLinkMap::_add_and_remove_links(const LinkMap& add, const LinkMap& remove)
{
// remove specified links
for (LinkMap::const_iterator it = remove.begin(); it != remove.end(); ++it ) {
// if link type exists
if (_links.count(it->first) > 0) {
// remove one by one
for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
_links[it->first].erase(*it2);
}
// wipe the type key if no more links are left
if (_links[it->first].size() == 0) {
_links.erase(it->first);
}
}
}
// add specified links
for (LinkMap::const_iterator it = add.begin(); it != add.end(); ++it ) {
for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
_links[it->first].insert(*it2);
}
}
}
void SmartLinkMap::_replace_links(const LinkMap& links)
{
//_links.clear();
_links=links;
}
const LinkSet& SmartLinkMap::get_links(const std::string& key)
{
LinkMap::const_iterator it = _links.find(key);
if (it==_links.end()) return empty_link_set;
return it->second;
}
const LinkMap& SmartLinkMap::get_all_links()
{
return _links;
}
// IUEventHandler//{{{
IUEventHandler::IUEventHandler(IUEventHandlerFunction function, IUEventType event_mask, const std::string& category)
: _function(function), _event_mask(event_mask), _for_all_categories(false)
{
if (category=="") {
_for_all_categories = true;
} else {
_categories.insert(category);
}
}
IUEventHandler::IUEventHandler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories)
: _function(function), _event_mask(event_mask), _for_all_categories(false)
{
if (categories.size()==0) {
_for_all_categories = true;
} else {
_categories = categories;
}
}
void IUEventHandler::call(Buffer* buffer, boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category)
{
if (_condition_met(event_type, category)) {
//IUInterface::ptr iu = buffer->get(uid);
//if (iu) {
#if VERBOSE_HANDLERS == 1
std::cout << "[" << pthread_self() << " handler ENTER]" << std::endl;
#endif
_function(iu, event_type, local);
#if VERBOSE_HANDLERS == 1
std::cout << "[" << pthread_self() << " handler EXIT]" << std::endl;
#endif
void Buffer::_allocate_unique_name(const std::string& basename, const std::string& function) {
std::string uuid = ipaaca::generate_uuid_string();
_unique_name = "/ipaaca/component/" + _basename + "ID" + _uuid + "/" + function;
void Buffer::register_handler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories)
{
IUEventHandler::ptr handler = IUEventHandler::ptr(new IUEventHandler(function, event_mask, categories));
_event_handlers.push_back(handler);
}
void Buffer::register_handler(IUEventHandlerFunction function, IUEventType event_mask, const std::string& category)
{
IUEventHandler::ptr handler = IUEventHandler::ptr(new IUEventHandler(function, event_mask, category));
_event_handlers.push_back(handler);
}
void Buffer::call_iu_event_handlers(boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category)
{

Ramin Yaghoubzadeh
committed
//IPAACA_INFO("handling an event " << ipaaca::iu_event_type_to_str(event_type) << " for IU " << iu->uid())
for (std::vector<IUEventHandler::ptr>::iterator it = _event_handlers.begin(); it != _event_handlers.end(); ++it) {
(*it)->call(this, iu, local, event_type, category);
}
}
//}}}
// Callbacks for OutputBuffer//{{{
CallbackIUPayloadUpdate::CallbackIUPayloadUpdate(Buffer* buffer): _buffer(buffer) { }
CallbackIULinkUpdate::CallbackIULinkUpdate(Buffer* buffer): _buffer(buffer) { }
CallbackIUCommission::CallbackIUCommission(Buffer* buffer): _buffer(buffer) { }
boost::shared_ptr<int> CallbackIUPayloadUpdate::call(const std::string& methodName, boost::shared_ptr<IUPayloadUpdate> update)
{
//std::cout << "-- Received a modify_payload with " << update->new_items.size() << " keys to merge." << std::endl;
IUInterface::ptr iui = _buffer->get(update->uid);
if (! iui) {
IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid)
return boost::shared_ptr<int>(new int(0));
}
IU::ptr iu = boost::static_pointer_cast<IU>(iui);
iu->_revision_lock.lock();
if ((update->revision != 0) && (update->revision != iu->_revision)) {
IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid)
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(0));
}
if (update->is_delta) {
// FIXME FIXME this is an unsolved problem atm: deletes in a delta update are
// sent individually. We should have something like _internal_merge_and_remove
for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) {
iu->payload()._internal_remove(*it, update->writer_name); //_buffer->unique_name());
// but it is solved for pure merges:
iu->payload()._internal_merge(update->new_items, update->writer_name);
iu->payload()._internal_replace_all(update->new_items, update->writer_name); //_buffer->unique_name());
//std::cout << "-- Calling update handler due to remote write." << std::endl;
_buffer->call_iu_event_handlers(iu, true, IU_UPDATED, iu->category());
revision_t revision = iu->revision();
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(revision));
}
boost::shared_ptr<int> CallbackIULinkUpdate::call(const std::string& methodName, boost::shared_ptr<IULinkUpdate> update)
{
IUInterface::ptr iui = _buffer->get(update->uid);
if (! iui) {
IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid)
return boost::shared_ptr<int>(new int(0));
}
IU::ptr iu = boost::static_pointer_cast<IU>(iui);
iu->_revision_lock.lock();
if ((update->revision != 0) && (update->revision != iu->_revision)) {
IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid)
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(0));
}
if (update->is_delta) {
iu->modify_links(update->new_links, update->links_to_remove, update->writer_name);
} else {
iu->set_links(update->new_links, update->writer_name);
}
_buffer->call_iu_event_handlers(iu, true, IU_LINKSUPDATED, iu->category());
revision_t revision = iu->revision();
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(revision));
}
boost::shared_ptr<int> CallbackIUCommission::call(const std::string& methodName, boost::shared_ptr<protobuf::IUCommission> update)
{
IUInterface::ptr iui = _buffer->get(update->uid());
if (! iui) {
IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid())
return boost::shared_ptr<int>(new int(0));
}
IU::ptr iu = boost::static_pointer_cast<IU>(iui);
iu->_revision_lock.lock();
if ((update->revision() != 0) && (update->revision() != iu->_revision)) {
IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid())
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(0));
}
if (iu->committed()) {
return boost::shared_ptr<int>(new int(0));
} else {
}
iu->_internal_commit(update->writer_name());
_buffer->call_iu_event_handlers(iu, true, IU_LINKSUPDATED, iu->category());
revision_t revision = iu->revision();
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(revision));
OutputBuffer::OutputBuffer(const std::string& basename)
void OutputBuffer::_initialize_server()
{
_server = getFactory().createServer( Scope( _unique_name ) );
_server->registerMethod("updatePayload", Server::CallbackPtr(new CallbackIUPayloadUpdate(this)));
_server->registerMethod("updateLinks", Server::CallbackPtr(new CallbackIULinkUpdate(this)));
_server->registerMethod("commit", Server::CallbackPtr(new CallbackIUCommission(this)));
}
OutputBuffer::ptr OutputBuffer::create(const std::string& basename)
{
Initializer::initialize_ipaaca_rsb_if_needed();
return OutputBuffer::ptr(new OutputBuffer(basename));
}
IUInterface::ptr OutputBuffer::get(const std::string& iu_uid)
{
IUStore::iterator it = _iu_store.find(iu_uid);
if (it==_iu_store.end()) return IUInterface::ptr();
return it->second;
}
std::set<IUInterface::ptr> OutputBuffer::get_ius()
{
std::set<IUInterface::ptr> set;
for (IUStore::iterator it=_iu_store.begin(); it!=_iu_store.end(); ++it) set.insert(it->second);
return set;
}
void OutputBuffer::_send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
IULinkUpdate* lup = new ipaaca::IULinkUpdate();
Informer<ipaaca::IULinkUpdate>::DataPtr ldata(lup);
lup->uid = iu->uid();
lup->is_delta = is_delta;
lup->revision = revision;
lup->is_delta = true;
lup->new_links = new_links;
if (is_delta) lup->links_to_remove = links_to_remove;
if (writer_name=="") lup->writer_name = _unique_name;
else lup->writer_name = writer_name;
Informer<AnyType>::Ptr informer = _get_informer(iu->category());
informer->publish(ldata);
void OutputBuffer::_send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
{
IUPayloadUpdate* pup = new ipaaca::IUPayloadUpdate();
Informer<ipaaca::IUPayloadUpdate>::DataPtr pdata(pup);
pup->uid = iu->uid();
pup->is_delta = is_delta;
pup->revision = revision;
pup->new_items = new_items;
if (is_delta) pup->keys_to_remove = keys_to_remove;
if (writer_name=="") pup->writer_name = _unique_name;
else pup->writer_name = writer_name;
Informer<AnyType>::Ptr informer = _get_informer(iu->category());
informer->publish(pdata);
void OutputBuffer::_send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name)
{
Informer<protobuf::IUCommission>::DataPtr data(new protobuf::IUCommission());
data->set_uid(iu->uid());
data->set_revision(revision);
if (writer_name=="") data->set_writer_name(_unique_name);
else data->set_writer_name(writer_name);
Informer<AnyType>::Ptr informer = _get_informer(iu->category());
informer->publish(data);
void OutputBuffer::add(IU::ptr iu)
if (_iu_store.count(iu->uid()) > 0) {
throw IUPublishedError();
}
if (iu->is_published()) {
throw IUPublishedError();
}
if (iu->access_mode() != IU_ACCESS_MESSAGE) {
// (for Message-type IUs: do not actually store them)
_iu_store[iu->uid()] = iu;
}
iu->_associate_with_buffer(this); //shared_from_this());
void OutputBuffer::_publish_iu(IU::ptr iu)
{
Informer<AnyType>::Ptr informer = _get_informer(iu->_category);
Informer<ipaaca::IU>::DataPtr iu_data(iu);
informer->publish(iu_data);
}
Informer<AnyType>::Ptr OutputBuffer::_get_informer(const std::string& category)
{
if (_informer_store.count(category) > 0) {
return _informer_store[category];
} else {

Ramin Yaghoubzadeh
committed
//IPAACA_INFO("Making new informer for category " << category)
std::string scope_string = "/ipaaca/category/" + category;
Informer<AnyType>::Ptr informer = getFactory().createInformer<AnyType> ( Scope(scope_string));
_informer_store[category] = informer;
return informer;
}
}
boost::shared_ptr<IU> OutputBuffer::remove(const std::string& iu_uid)
{

Ramin Yaghoubzadeh
committed
IUStore::iterator it = _iu_store.find(iu_uid);

Ramin Yaghoubzadeh
committed
if (it == _iu_store.end()) {
IPAACA_WARNING("Removal of IU " << iu_uid << " requested, but not present in our OutputBuffer")
//throw IUNotFoundError();
}

Ramin Yaghoubzadeh
committed
IU::ptr iu = it->second;
_retract_iu(iu);
_iu_store.erase(iu_uid);
return iu;
boost::shared_ptr<IU> OutputBuffer::remove(IU::ptr iu)

Ramin Yaghoubzadeh
committed
return remove(iu->uid()); // to make sure it is in the store

Ramin Yaghoubzadeh
committed
void OutputBuffer::_retract_iu(IU::ptr iu)
{
Informer<protobuf::IURetraction>::DataPtr data(new protobuf::IURetraction());
data->set_uid(iu->uid());
data->set_revision(iu->revision());
Informer<AnyType>::Ptr informer = _get_informer(iu->category());
informer->publish(data);
}
InputBuffer::InputBuffer(const std::string& basename, const std::set<std::string>& category_interests)
:Buffer(basename, "IB")
{
for (std::set<std::string>::const_iterator it=category_interests.begin(); it!=category_interests.end(); ++it) {
_create_category_listener_if_needed(*it);
}
}
InputBuffer::InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests)
:Buffer(basename, "IB")
{
for (std::vector<std::string>::const_iterator it=category_interests.begin(); it!=category_interests.end(); ++it) {
_create_category_listener_if_needed(*it);
}
}
InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1)
:Buffer(basename, "IB")
_create_category_listener_if_needed(category_interest1);
InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2)
:Buffer(basename, "IB")
{
_create_category_listener_if_needed(category_interest1);
_create_category_listener_if_needed(category_interest2);
}
InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3)
:Buffer(basename, "IB")
{
_create_category_listener_if_needed(category_interest1);
_create_category_listener_if_needed(category_interest2);
_create_category_listener_if_needed(category_interest3);
}
InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4)
:Buffer(basename, "IB")
{
_create_category_listener_if_needed(category_interest1);
_create_category_listener_if_needed(category_interest2);
_create_category_listener_if_needed(category_interest3);
_create_category_listener_if_needed(category_interest4);
}
InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::set<std::string>& category_interests)
{
Initializer::initialize_ipaaca_rsb_if_needed();
return InputBuffer::ptr(new InputBuffer(basename, category_interests));
}
InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::vector<std::string>& category_interests)
{
Initializer::initialize_ipaaca_rsb_if_needed();
return InputBuffer::ptr(new InputBuffer(basename, category_interests));
}
InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1)
{
Initializer::initialize_ipaaca_rsb_if_needed();
return InputBuffer::ptr(new InputBuffer(basename, category_interest1));
}
InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2)
{
Initializer::initialize_ipaaca_rsb_if_needed();
return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2));
}
InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3)
{
Initializer::initialize_ipaaca_rsb_if_needed();
return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3));
}
InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4)
{
Initializer::initialize_ipaaca_rsb_if_needed();
return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3, category_interest4));
}
IUInterface::ptr InputBuffer::get(const std::string& iu_uid)
{
RemotePushIUStore::iterator it = _iu_store.find(iu_uid); // TODO genericize
if (it==_iu_store.end()) return IUInterface::ptr();
return it->second;
}
std::set<IUInterface::ptr> InputBuffer::get_ius()
{
std::set<IUInterface::ptr> set;
for (RemotePushIUStore::iterator it=_iu_store.begin(); it!=_iu_store.end(); ++it) set.insert(it->second); // TODO genericize
return set;
}
RemoteServerPtr InputBuffer::_get_remote_server(const std::string& unique_server_name)
std::map<std::string, RemoteServerPtr>::iterator it = _remote_server_store.find(unique_server_name);
if (it!=_remote_server_store.end()) return it->second;
RemoteServerPtr remote_server = getFactory().createRemoteServer(Scope(unique_server_name));
_remote_server_store[unique_server_name] = remote_server;
return remote_server;
}
ListenerPtr InputBuffer::_create_category_listener_if_needed(const std::string& category)
{
std::map<std::string, ListenerPtr>::iterator it = _listener_store.find(category);
if (it!=_listener_store.end()) return it->second;

Ramin Yaghoubzadeh
committed
//IPAACA_INFO("Creating a new listener for category " << category)
std::string scope_string = "/ipaaca/category/" + category;
ListenerPtr listener = getFactory().createListener( Scope(scope_string) );
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
HandlerPtr event_handler = HandlerPtr(
new EventFunctionHandler(
boost::bind(&InputBuffer::_handle_iu_events, this, _1)
)
);
listener->addHandler(event_handler);
_listener_store[category] = listener;
return listener;
/*
'''Return (or create, store and return) a category listener.'''
if iu_category in self._listener_store: return self._informer_store[iu_category]
cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
cat_listener.addHandler(self._handle_iu_events)
self._listener_store[iu_category] = cat_listener
self._category_interests.append(iu_category)
logger.info("Added listener in scope "+"/ipaaca/category/"+iu_category)
return cat_listener
*/
}
void InputBuffer::_handle_iu_events(EventPtr event)
{
std::string type = event->getType();
if (type == "ipaaca::RemotePushIU") {
boost::shared_ptr<RemotePushIU> iu = boost::static_pointer_cast<RemotePushIU>(event->getData());
if (_iu_store.count(iu->category()) > 0) {
// already got the IU... ignore
} else {
_iu_store[iu->uid()] = iu;
iu->_set_buffer(this);
call_iu_event_handlers(iu, false, IU_ADDED, iu->category() );

Ramin Yaghoubzadeh
committed
//IPAACA_INFO( "New RemotePushIU state: " << (*iu) )
} else if (type == "ipaaca::RemoteMessage") {
boost::shared_ptr<RemoteMessage> iu = boost::static_pointer_cast<RemoteMessage>(event->getData());
//_iu_store[iu->uid()] = iu;
//iu->_set_buffer(this);
//std::cout << "REFCNT after cast, before calling handlers: " << iu.use_count() << std::endl;
call_iu_event_handlers(iu, false, IU_MESSAGE, iu->category() );
//_iu_store.erase(iu->uid());
} else {
RemotePushIUStore::iterator it;
if (type == "ipaaca::IUPayloadUpdate") {
boost::shared_ptr<IUPayloadUpdate> update = boost::static_pointer_cast<IUPayloadUpdate>(event->getData());

Ramin Yaghoubzadeh
committed
//IPAACA_INFO("** writer name: " << update->writer_name)
if (update->writer_name == _unique_name) {
return;
}
it = _iu_store.find(update->uid);
if (it == _iu_store.end()) {
IPAACA_INFO("Ignoring UPDATED message for an IU that we did not fully receive before")
return;
}
//
it->second->_apply_update(update);
call_iu_event_handlers(it->second, false, IU_UPDATED, it->second->category() );
//
//
} else if (type == "ipaaca::IULinkUpdate") {
boost::shared_ptr<IULinkUpdate> update = boost::static_pointer_cast<IULinkUpdate>(event->getData());
if (update->writer_name == _unique_name) {
return;
}
it = _iu_store.find(update->uid);
if (it == _iu_store.end()) {
IPAACA_INFO("Ignoring LINKSUPDATED message for an IU that we did not fully receive before")
return;
}
//
it->second->_apply_link_update(update);
call_iu_event_handlers(it->second, false, IU_LINKSUPDATED, it->second->category() );
//
//
} else if (type == "ipaaca::protobuf::IUCommission") {
boost::shared_ptr<protobuf::IUCommission> update = boost::static_pointer_cast<protobuf::IUCommission>(event->getData());
if (update->writer_name() == _unique_name) {
return;
}
it = _iu_store.find(update->uid());
if (it == _iu_store.end()) {
IPAACA_INFO("Ignoring COMMITTED message for an IU that we did not fully receive before")
return;
}
//
it->second->_apply_commission();
it->second->_revision = update->revision();
call_iu_event_handlers(it->second, false, IU_COMMITTED, it->second->category() );
//
//
} else if (type == "ipaaca::protobuf::IURetraction") {
boost::shared_ptr<protobuf::IURetraction> update = boost::static_pointer_cast<protobuf::IURetraction>(event->getData());
it = _iu_store.find(update->uid());
if (it == _iu_store.end()) {
IPAACA_INFO("Ignoring RETRACTED message for an IU that we did not fully receive before")
return;
}
//
it->second->_revision = update->revision();
it->second->_apply_retraction();
// remove from InputBuffer FIXME: this is a crossover between retracted and deleted behavior
_iu_store.erase(it->first);
// and call the handler. IU reference is still valid for this call, although removed from buffer.
call_iu_event_handlers(it->second, false, IU_COMMITTED, it->second->category() );
//
} else {
std::cout << "(Unhandled Event type " << type << " !)" << std::endl;
return;
}

Ramin Yaghoubzadeh
committed
//IPAACA_INFO( "New RemotePushIU state: " << *(it->second) )
}
}
// IUInterface//{{{
IUInterface::IUInterface()
: _buffer(NULL), _committed(false), _retracted(false)
{
}
void IUInterface::_set_uid(const std::string& uid) {
if (_uid != "") {
throw IUAlreadyHasAnUIDError();
}
_uid = uid;
}
void IUInterface::_set_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) {
if (_buffer) {
throw IUAlreadyInABufferError();
}
_buffer = buffer;
}
void IUInterface::_set_owner_name(const std::string& owner_name) {
if (_owner_name != "") {
throw IUAlreadyHasAnOwnerNameError();
}
_owner_name = owner_name;
}
/// set the buffer pointer and the owner names of IU and Payload
void IUInterface::_associate_with_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) {
_set_buffer(buffer); // will throw if already set
_set_owner_name(buffer->unique_name());
payload()._set_owner_name(buffer->unique_name());
}
/// C++-specific convenience function to add one single link
void IUInterface::add_link(const std::string& type, const std::string& target, const std::string& writer_name)
{
LinkMap none;
LinkMap add;
add[type].insert(target);
_modify_links(true, add, none, writer_name);
_add_and_remove_links(add, none);
}
/// C++-specific convenience function to remove one single link
void IUInterface::remove_link(const std::string& type, const std::string& target, const std::string& writer_name)
{
LinkMap none;
LinkMap remove;
remove[type].insert(target);
_modify_links(true, none, remove, writer_name);
_add_and_remove_links(none, remove);
}
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
void IUInterface::add_links(const std::string& type, const LinkSet& targets, const std::string& writer_name)
{
LinkMap none;
LinkMap add;
add[type] = targets;
_modify_links(true, add, none, writer_name);
_add_and_remove_links(add, none);
}
void IUInterface::remove_links(const std::string& type, const LinkSet& targets, const std::string& writer_name)
{
LinkMap none;
LinkMap remove;
remove[type] = targets;
_modify_links(true, none, remove, writer_name);
_add_and_remove_links(none, remove);
}
void IUInterface::modify_links(const LinkMap& add, const LinkMap& remove, const std::string& writer_name)
{
_modify_links(true, add, remove, writer_name);
_add_and_remove_links(add, remove);
}
void IUInterface::set_links(const LinkMap& links, const std::string& writer_name)
{
LinkMap none;
_modify_links(false, links, none, writer_name);
_replace_links(links);
}
//}}}
// IU//{{{
IU::ptr IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
IU::ptr iu = IU::ptr(new IU(category, access_mode, read_only, payload_type)); /* params */ //));
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
IU::IU(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
{
_revision = 1;
_uid = ipaaca::generate_uuid_string();
_category = category;
_payload_type = payload_type;
// payload initialization deferred to IU::create(), above
_read_only = read_only;
_access_mode = access_mode;
_committed = false;
}
void IU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
_revision_lock.lock();
if (_committed) {
_revision_lock.unlock();
throw IUCommittedError();
}
_increase_revision_number();
if (is_published()) {
_buffer->_send_iu_link_update(this, is_delta, _revision, new_links, links_to_remove, writer_name);
}
_revision_lock.unlock();
}
void IU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
{
_revision_lock.lock();
if (_committed) {
_revision_lock.unlock();
throw IUCommittedError();
}
_increase_revision_number();
if (is_published()) {
//std::cout << "Sending a payload update with " << new_items.size() << " entries to merge." << std::endl;
_buffer->_send_iu_payload_update(this, is_delta, _revision, new_items, keys_to_remove, writer_name);
}
_revision_lock.unlock();
}
void IU::commit()
{
_internal_commit();
}
void IU::_internal_commit(const std::string& writer_name)
{
_revision_lock.lock();
if (_committed) {
_revision_lock.unlock();
throw IUCommittedError();
}
_increase_revision_number();
_committed = true;
if (is_published()) {
_buffer->_send_iu_commission(this, _revision, writer_name);
}
_revision_lock.unlock();
}
//}}}
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
// Message//{{{
Message::ptr Message::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
{
Message::ptr iu = Message::ptr(new Message(category, access_mode, read_only, payload_type)); /* params */ //));
iu->_payload.initialize(iu);
return iu;
}
Message::Message(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
: IU(category, access_mode, read_only, payload_type)
{
}
void Message::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
if (is_published()) {
IPAACA_INFO("Info: modifying a Message after sending has no global effects")
}
}
void Message::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
{
if (is_published()) {
IPAACA_INFO("Info: modifying a Message after sending has no global effects")
}
}
void Message::_internal_commit(const std::string& writer_name)
{
if (is_published()) {
IPAACA_INFO("Info: committing to a Message after sending has no global effects")
}
}
//}}}
RemotePushIU::ptr RemotePushIU::create()
RemotePushIU::ptr iu = RemotePushIU::ptr(new RemotePushIU(/* params */));
iu->_payload.initialize(iu);
return iu;
}
RemotePushIU::RemotePushIU()
{
// nothing
}
void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
if (_committed) {
throw IUCommittedError();
}
if (_read_only) {
throw IUReadOnlyError();
}
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
IULinkUpdate::ptr update = IULinkUpdate::ptr(new IULinkUpdate());
update->uid = _uid;
update->revision = _revision;
update->is_delta = is_delta;
update->writer_name = _buffer->unique_name();
update->new_links = new_links;
update->links_to_remove = links_to_remove;
boost::shared_ptr<int> result = server->call<int>("updateLinks", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO
if (*result == 0) {
throw IUUpdateFailedError();
} else {
_revision = *result;
}
}
void RemotePushIU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
{
//std::cout << "-- Sending a modify_payload with " << new_items.size() << " keys to merge." << std::endl;
if (_committed) {
throw IUCommittedError();
}
if (_read_only) {
throw IUReadOnlyError();
}
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
IUPayloadUpdate::ptr update = IUPayloadUpdate::ptr(new IUPayloadUpdate());
update->uid = _uid;
update->revision = _revision;
update->is_delta = is_delta;
update->writer_name = _buffer->unique_name();
update->new_items = new_items;
update->keys_to_remove = keys_to_remove;
boost::shared_ptr<int> result = server->call<int>("updatePayload", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO
if (*result == 0) {
throw IUUpdateFailedError();
} else {
_revision = *result;
}
}
void RemotePushIU::commit()
{
if (_read_only) {
throw IUReadOnlyError();
}
if (_committed) {
// Following python version: ignoring multiple commit
return;
}
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
boost::shared_ptr<protobuf::IUCommission> update = boost::shared_ptr<protobuf::IUCommission>(new protobuf::IUCommission());
update->set_uid(_uid);
update->set_revision(_revision);
update->set_writer_name(_buffer->unique_name());
boost::shared_ptr<int> result = server->call<int>("commit", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO