Newer
Older
#include <ipaaca.h>
#include <cstdlib>
namespace ipaaca {
// util and init//{{{
std::string generate_uuid_string()
{
uuid_t uuidt;
uuid_string_t uuidstr;
uuid_generate(uuidt);
uuid_unparse_lower(uuidt, uuidstr);
return uuidstr;
}
//const LinkSet EMPTY_LINK_SET = LinkSet();
//const std::set<std::string> EMPTY_LINK_SET();
void initialize_ipaaca_rsb()
{
ParticipantConfig config = ParticipantConfig::fromConfiguration();
Factory::getInstance().setDefaultParticipantConfig(config);
boost::shared_ptr<IUPayloadUpdateConverter> payload_update_converter(new IUPayloadUpdateConverter());
boost::shared_ptr<IULinkUpdateConverter> link_update_converter(new IULinkUpdateConverter());
stringConverterRepository()->registerConverter(payload_update_converter);
stringConverterRepository()->registerConverter(link_update_converter);
//IPAACA_TODO("initialize all converters")
}
/*
void init_inprocess_too() {
//ParticipantConfig config = Factory::getInstance().getDefaultParticipantConfig();
ParticipantConfig config = ParticipantConfig::fromFile("rsb.cfg");
//ParticipantConfig::Transport inprocess = config.getTransport("inprocess");
//inprocess.setEnabled(true);
//config.addTransport(inprocess);
Factory::getInstance().setDefaultParticipantConfig(config);
}
*/
//}}}
std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj)//{{{
{
os << "PayloadUpdate(uid=" << obj.uid << ", revision=" << obj.revision;
os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False");
os << ", new_items = {";
bool first = true;
for (std::map<std::string, std::string>::const_iterator it=obj.new_items.begin(); it!=obj.new_items.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << it->first << "':'" << it->second << "'";
}
os << "}, keys_to_remove = [";
first = true;
for (std::vector<std::string>::const_iterator it=obj.keys_to_remove.begin(); it!=obj.keys_to_remove.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << *it << "'";
}
os << "])";
return os;
}
//}}}
std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj)//{{{
{
os << "LinkUpdate(uid=" << obj.uid << ", revision=" << obj.revision;
os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False");
os << ", new_links = {";
bool first = true;
for (std::map<std::string, std::set<std::string> >::const_iterator it=obj.new_links.begin(); it!=obj.new_links.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << it->first << "': [";
bool ffirst = true;
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
if (ffirst) { ffirst=false; } else { os << ", "; }
os << "'" << *it2 << "'";
}
os << "]";
}
os << "}, links_to_remove = {";
first = true;
for (std::map<std::string, std::set<std::string> >::const_iterator it=obj.links_to_remove.begin(); it!=obj.links_to_remove.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << it->first << "': [";
bool ffirst = true;
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
if (ffirst) { ffirst=false; } else { os << ", "; }
os << "'" << *it2 << "'";
}
os << "]";
}
os << "})";
return os;
}
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// SmartLinkMap//{{{
void SmartLinkMap::_add_and_remove_links(const LinkMap& add, const LinkMap& remove)
{
// remove specified links
for (LinkMap::const_iterator it = remove.begin(); it != remove.end(); ++it ) {
// if link type exists
if (_links.count(it->first) > 0) {
// remove one by one
for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
_links[it->first].erase(*it2);
}
// wipe the type key if no more links are left
if (_links[it->first].size() == 0) {
_links.erase(it->first);
}
}
}
// add specified links
for (LinkMap::const_iterator it = add.begin(); it != add.end(); ++it ) {
for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
_links[it->first].insert(*it2);
}
}
}
void SmartLinkMap::_replace_links(const LinkMap& links)
{
//_links.clear();
_links=links;
}
//}}}
// Buffer//{{{
void Buffer::_allocate_unique_name(const std::string& basename) {
std::string uuid = ipaaca::generate_uuid_string();
std::string name = basename + "-" + uuid.substr(0,8);
_unique_name = name;
}
//}}}
OutputBuffer::OutputBuffer(const std::string& basename)
:Buffer(basename)
{
}
void OutputBuffer::_send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
IPAACA_IMPLEMENT_ME
}
void OutputBuffer::_send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
{
IPAACA_IMPLEMENT_ME
void OutputBuffer::_send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name)
{
IPAACA_IMPLEMENT_ME
}
void OutputBuffer::add(IU::ref iu)
{
IPAACA_IMPLEMENT_ME
// TODO place in iu store
iu->_associate_with_buffer(this); //shared_from_this());
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
// TODO
}
/*
def _send_iu_link_update(self, iu, is_delta, revision, new_links=None, links_to_remove=None, writer_name="undef"):
'''Send an IU link update.
Keyword arguments:
iu -- the IU being updated
is_delta -- whether this is an incremental update or a replacement
the whole link dictionary
revision -- the new revision number
new_links -- a dictionary of new link sets
links_to_remove -- a dict of the link sets that shall be removed
writer_name -- name of the Buffer that initiated this update, necessary
to enable remote components to filter out updates that originate d
from their own operations
'''
if new_links is None:
new_links = {}
if links_to_remove is None:
links_to_remove = {}
link_update = IULinkUpdate(iu._uid, is_delta=is_delta, revision=revision)
link_update.new_links = new_links
if is_delta:
link_update.links_to_remove = links_to_remove
link_update.writer_name = writer_name
informer = self._get_informer(iu._category)
informer.publishData(link_update)
# FIXME send the notification to the target, if the target is not the writer_name
*/
/*
def _send_iu_payload_update(self, iu, is_delta, revision, new_items=None, keys_to_remove=None, writer_name="undef"):
'''Send an IU payload update.
Keyword arguments:
iu -- the IU being updated
is_delta -- whether this is an incremental update or a replacement
revision -- the new revision number
new_items -- a dictionary of new payload items
keys_to_remove -- a list of the keys that shall be removed from the
payload
writer_name -- name of the Buffer that initiated this update, necessary
to enable remote components to filter out updates that originate d
from their own operations
'''
if new_items is None:
new_items = {}
if keys_to_remove is None:
keys_to_remove = []
payload_update = IUPayloadUpdate(iu._uid, is_delta=is_delta, revision=revision)
payload_update.new_items = new_items
if is_delta:
payload_update.keys_to_remove = keys_to_remove
payload_update.writer_name = writer_name
informer = self._get_informer(iu._category)
informer.publishData(payload_update)
// InputBuffer//{{{
InputBuffer::InputBuffer(const std::string& basename)
:Buffer(basename)
{
}
//}}}
// IUInterface//{{{
IUInterface::IUInterface()
: _buffer(NULL), _committed(false)
{
}
void IUInterface::_set_uid(const std::string& uid) {
if (_uid != "") {
throw IUAlreadyHasAnUIDError();
}
_uid = uid;
}
void IUInterface::_set_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) {
if (_buffer) {
throw IUAlreadyInABufferError();
}
_buffer = buffer;
}
void IUInterface::_set_owner_name(const std::string& owner_name) {
if (_owner_name != "") {
throw IUAlreadyHasAnOwnerNameError();
}
_owner_name = owner_name;
}
/// set the buffer pointer and the owner names of IU and Payload
void IUInterface::_associate_with_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) {
_set_buffer(buffer); // will throw if already set
_set_owner_name(buffer->unique_name());
payload()._set_owner_name(buffer->unique_name());
}
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
void IUInterface::add_links(const std::string& type, const LinkSet& targets, const std::string& writer_name)
{
LinkMap none;
LinkMap add;
add[type] = targets;
_modify_links(true, add, none, writer_name);
_add_and_remove_links(add, none);
}
void IUInterface::remove_links(const std::string& type, const LinkSet& targets, const std::string& writer_name)
{
LinkMap none;
LinkMap remove;
remove[type] = targets;
_modify_links(true, none, remove, writer_name);
_add_and_remove_links(none, remove);
}
void IUInterface::modify_links(const LinkMap& add, const LinkMap& remove, const std::string& writer_name)
{
_modify_links(true, add, remove, writer_name);
_add_and_remove_links(add, remove);
}
void IUInterface::set_links(const LinkMap& links, const std::string& writer_name)
{
LinkMap none;
_modify_links(false, links, none, writer_name);
_replace_links(links);
}
//}}}
// IU//{{{
IU::ref IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
IU::ref iu = IU::ref(new IU(category, access_mode, read_only, payload_type)); /* params */ //));
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
IU::IU(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
{
_revision = 1;
_uid = ipaaca::generate_uuid_string();
_category = category;
_payload_type = payload_type;
// payload initialization deferred to IU::create(), above
_read_only = read_only;
_access_mode = access_mode;
_committed = false;
}
void IU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
_revision_lock.lock();
if (_committed) {
_revision_lock.unlock();
throw IUCommittedError();
}
_increase_revision_number();
if (is_published()) {
_buffer->_send_iu_link_update(this, is_delta, _revision, new_links, links_to_remove, writer_name);
}
_revision_lock.unlock();
}
void IU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
{
_revision_lock.lock();
if (_committed) {
_revision_lock.unlock();
throw IUCommittedError();
}
_increase_revision_number();
if (is_published()) {
_buffer->_send_iu_payload_update(this, is_delta, _revision, new_items, keys_to_remove, writer_name);
}
_revision_lock.unlock();
}
void IU::commit()
{
_internal_commit();
}
void IU::_internal_commit(const std::string& writer_name)
{
_revision_lock.lock();
if (_committed) {
_revision_lock.unlock();
throw IUCommittedError();
}
_increase_revision_number();
_committed = true;
if (is_published()) {
_buffer->_send_iu_commission(this, _revision, writer_name);
}
_revision_lock.unlock();
}
//}}}
// RemotePushIU//{{{
RemotePushIU::ref RemotePushIU::create()
{
RemotePushIU::ref iu = RemotePushIU::ref(new RemotePushIU(/* params */));
iu->_payload.initialize(iu);
return iu;
}
RemotePushIU::RemotePushIU()
{
// nothing
}
void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
IPAACA_IMPLEMENT_ME
}
void RemotePushIU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
{
IPAACA_IMPLEMENT_ME
}
void RemotePushIU::commit()
{
IPAACA_IMPLEMENT_ME
}
//}}}
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
// PayloadEntryProxy//{{{
PayloadEntryProxy::PayloadEntryProxy(Payload* payload, const std::string& key)
: _payload(payload), _key(key)
{
}
PayloadEntryProxy& PayloadEntryProxy::operator=(const std::string& value)
{
_payload->set(_key, value);
return *this;
}
PayloadEntryProxy::operator std::string()
{
return _payload->get(_key);
}
PayloadEntryProxy::operator long()
{
return atol(operator std::string().c_str());
}
PayloadEntryProxy::operator double()
{
return atof(operator std::string().c_str());
}
//}}}
// Payload//{{{
void Payload::initialize(boost::shared_ptr<IUInterface> iu)
{
_iu = iu;
}
PayloadEntryProxy Payload::operator[](const std::string& key)
{
//boost::shared_ptr<PayloadEntryProxy> p(new PayloadEntryProxy(this, key));
return PayloadEntryProxy(this, key);
}
inline void Payload::set(const std::string& k, const std::string& v) {
std::map<std::string, std::string> _new;
std::vector<std::string> _remove;
_new[k]=v;
_iu->_modify_payload(true, _new, _remove, "" );
_store[k] = v;
}
inline void Payload::remove(const std::string& k) {
std::map<std::string, std::string> _new;
std::vector<std::string> _remove;
_remove.push_back(k);
_iu->_modify_payload(true, _new, _remove, "" );
_store.erase(k);
}
inline std::string Payload::get(const std::string& k) {
if (_store.count(k)>0) return _store[k];
else return IPAACA_PAYLOAD_DEFAULT_STRING_VALUE;
}
//}}}
// IUConverter//{{{
IUConverter::IUConverter()
: Converter<std::string> ("ipaaca::IU", "ipaaca-iu", true)
{
}
std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire)
{
// Ensure that DATA actually holds a datum of the data-type we expect.
assert(data.first == getDataType()); // "ipaaca::IU"
// NOTE: a dynamic_pointer_cast cannot be used from void*
boost::shared_ptr<const IU> obj = boost::static_pointer_cast<const IU> (data.second);
boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
// transfer obj data to pbo
pbo->set_uid(obj->uid());
pbo->set_revision(obj->revision());
pbo->set_category(obj->category());
pbo->set_payload_type(obj->payload_type());
pbo->set_owner_name(obj->owner_name());
pbo->set_committed(obj->committed());
pbo->set_access_mode(ipaaca::protobuf::IU::PUSH); // TODO
pbo->set_read_only(obj->read_only());
for (std::map<std::string, std::string>::const_iterator it=obj->_payload._store.begin(); it!=obj->_payload._store.end(); ++it) {
protobuf::PayloadItem* item = pbo->add_payload();
item->set_key(it->first);
item->set_value(it->second);
item->set_type("str"); // FIXME other types than str (later)
for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) {
protobuf::LinkSet* links = pbo->add_links();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
pbo->SerializeToString(&wire);
return getWireSchema();
}
AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "ipaaca-iu"
boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
pbo->ParseFromString(wire);
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode());
switch(mode) {
case IU_ACCESS_PUSH:
{
// Create a "remote push IU"
boost::shared_ptr<RemotePushIU> obj(new RemotePushIU());
// transfer pbo data to obj
obj->_uid = pbo->uid();
obj->_revision = pbo->revision();
obj->_category = pbo->category();
obj->_payload_type = pbo->payload_type();
obj->_owner_name = pbo->owner_name();
obj->_committed = pbo->committed();
obj->_read_only = pbo->read_only();
obj->_access_mode = IU_ACCESS_PUSH;
for (int i=0; i<pbo->payload_size(); i++) {
const protobuf::PayloadItem& it = pbo->payload(i);
obj->_payload._store[it.key()] = it.value();
}
for (int i=0; i<pbo->links_size(); i++) {
const protobuf::LinkSet& pls = pbo->links(i);
LinkSet& ls = obj->_links._links[pls.type()];
for (int j=0; j<pls.targets_size(); j++) {
ls.insert(pls.targets(j));
}
}
return std::make_pair(getDataType(), obj);
break;
}
default:
// other cases not handled yet! ( TODO )
throw NotImplementedError();
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
}
}
//}}}
// IUPayloadUpdateConverter//{{{
IUPayloadUpdateConverter::IUPayloadUpdateConverter()
: Converter<std::string> ("ipaaca::IUPayloadUpdate", "ipaaca-iu-payload-update", true)
{
}
std::string IUPayloadUpdateConverter::serialize(const AnnotatedData& data, std::string& wire)
{
assert(data.first == getDataType()); // "ipaaca::IUPayloadUpdate"
boost::shared_ptr<const IUPayloadUpdate> obj = boost::static_pointer_cast<const IUPayloadUpdate> (data.second);
boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
// transfer obj data to pbo
pbo->set_uid(obj->uid);
pbo->set_revision(obj->revision);
pbo->set_writer_name(obj->writer_name);
pbo->set_is_delta(obj->is_delta);
for (std::map<std::string, std::string>::const_iterator it=obj->new_items.begin(); it!=obj->new_items.end(); ++it) {
protobuf::PayloadItem* item = pbo->add_new_items();
item->set_key(it->first);
item->set_value(it->second);
item->set_type("str"); // FIXME other types than str (later)
}
for (std::vector<std::string>::const_iterator it=obj->keys_to_remove.begin(); it!=obj->keys_to_remove.end(); ++it) {
pbo->add_keys_to_remove(*it);
}
pbo->SerializeToString(&wire);
return getWireSchema();
}
AnnotatedData IUPayloadUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "ipaaca-iu-payload-update"
boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
pbo->ParseFromString(wire);
boost::shared_ptr<IUPayloadUpdate> obj(new IUPayloadUpdate());
// transfer pbo data to obj
obj->uid = pbo->uid();
obj->revision = pbo->revision();
obj->writer_name = pbo->writer_name();
obj->is_delta = pbo->is_delta();
for (int i=0; i<pbo->new_items_size(); i++) {
const protobuf::PayloadItem& it = pbo->new_items(i);
obj->new_items[it.key()] = it.value();
}
for (int i=0; i<pbo->keys_to_remove_size(); i++) {
obj->keys_to_remove.push_back(pbo->keys_to_remove(i));
}
return std::make_pair(getDataType(), obj);
}
//}}}
// IULinkUpdateConverter//{{{
IULinkUpdateConverter::IULinkUpdateConverter()
: Converter<std::string> ("ipaaca::IULinkUpdate", "ipaaca-iu-link-update", true)
{
}
std::string IULinkUpdateConverter::serialize(const AnnotatedData& data, std::string& wire)
{
assert(data.first == getDataType()); // "ipaaca::IULinkUpdate"
boost::shared_ptr<const IULinkUpdate> obj = boost::static_pointer_cast<const IULinkUpdate> (data.second);
boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
// transfer obj data to pbo
pbo->set_uid(obj->uid);
pbo->set_revision(obj->revision);
pbo->set_writer_name(obj->writer_name);
pbo->set_is_delta(obj->is_delta);
for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->new_links.begin(); it!=obj->new_links.end(); ++it) {
protobuf::LinkSet* links = pbo->add_new_links();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->links_to_remove.begin(); it!=obj->links_to_remove.end(); ++it) {
protobuf::LinkSet* links = pbo->add_links_to_remove();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
pbo->SerializeToString(&wire);
return getWireSchema();
}
AnnotatedData IULinkUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "ipaaca-iu-link-update"
boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
pbo->ParseFromString(wire);
boost::shared_ptr<IULinkUpdate> obj(new IULinkUpdate());
// transfer pbo data to obj
obj->uid = pbo->uid();
obj->revision = pbo->revision();
obj->writer_name = pbo->writer_name();
obj->is_delta = pbo->is_delta();
for (int i=0; i<pbo->new_links_size(); ++i) {
const protobuf::LinkSet& it = pbo->new_links(i);
for (int j=0; j<it.targets_size(); ++j) {
obj->new_links[it.type()].insert(it.targets(j)); // = vec;
}
}
for (int i=0; i<pbo->links_to_remove_size(); ++i) {
const protobuf::LinkSet& it = pbo->links_to_remove(i);
for (int j=0; j<it.targets_size(); ++j) {
obj->links_to_remove[it.type()].insert(it.targets(j));
}
}
return std::make_pair(getDataType(), obj);
}
//}}}
} // of namespace ipaaca