Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
#include <ipaaca/ipaaca.h>
namespace ipaaca {
using namespace rsb;
using namespace rsb::filter;
using namespace rsb::converter;
using namespace rsb::patterns;
// IU//{{{
IPAACA_EXPORT IU::ptr IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
{
return IU::create(category, payload_type, read_only);
}
IPAACA_EXPORT IU::ptr IU::create(const std::string& category, const std::string& payload_type, bool read_only)
{

Ramin Yaghoubzadeh Torky
committed
IU::ptr iu = IU::ptr(new IU(category, IU_ACCESS_PUSH, read_only, (payload_type=="")?__ipaaca_static_option_default_payload_type:payload_type)); /* params */ //));
iu->_payload.initialize(iu);
return iu;
}
IPAACA_EXPORT IU::IU(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
{
_revision = 1;
_uid = ipaaca::generate_uuid_string();
_category = category;

Ramin Yaghoubzadeh Torky
committed
_payload_type = (payload_type=="")?__ipaaca_static_option_default_payload_type:payload_type;
// payload initialization deferred to IU::create(), above
_read_only = read_only;
_access_mode = access_mode;
_committed = false;
}
IPAACA_EXPORT void IU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
_revision_lock.lock();
if (_committed) {
_revision_lock.unlock();
throw IUCommittedError();
}
_increase_revision_number();
if (is_published()) {
_buffer->_send_iu_link_update(this, is_delta, _revision, new_links, links_to_remove, writer_name);
}
_revision_lock.unlock();
}
/*
* IPAACA_EXPORT void IU::_publish_resend(IU::ptr iu, const std::string& hidden_scope_name)
{
//_revision_lock.lock();
//if (_committed) {
// _revision_lock.unlock();
// throw IUCommittedError();
//}
//_increase_revision_number();
//if (is_published()) {
//IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name
_buffer->_publish_iu_resend(iu, hidden_scope_name);
//}
//_revision_lock.unlock();
}
IPAACA_EXPORT void IU::_modify_payload(bool is_delta, const std::map<std::string, PayloadDocumentEntry::ptr>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
IPAACA_INFO("")
_revision_lock.lock();
if (_committed) {
_revision_lock.unlock();
throw IUCommittedError();
}
_increase_revision_number();
if (is_published()) {
IPAACA_DEBUG("Sending a payload update, new entries:")
for (auto& kv: new_items) {
IPAACA_DEBUG(" " << kv.first << " -> " << kv.second)
}
IPAACA_DEBUG("and with removed keys:")
for (auto& k: keys_to_remove) {
IPAACA_DEBUG(" " << k)
}
_buffer->_send_iu_payload_update(this, is_delta, _revision, new_items, keys_to_remove, writer_name);
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
}
_revision_lock.unlock();
}
IPAACA_EXPORT void IU::commit()
{
_internal_commit();
}
IPAACA_EXPORT void IU::_internal_commit(const std::string& writer_name)
{
_revision_lock.lock();
if (_committed) {
_revision_lock.unlock();
throw IUCommittedError();
}
_increase_revision_number();
_committed = true;
if (is_published()) {
_buffer->_send_iu_commission(this, _revision, writer_name);
}
_revision_lock.unlock();
}
//}}}
// Message//{{{
Message::ptr Message::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
{

Ramin Yaghoubzadeh Torky
committed
return Message::create(category, (payload_type=="")?__ipaaca_static_option_default_payload_type:payload_type);
}
Message::ptr Message::create(const std::string& category, const std::string& payload_type)
{

Ramin Yaghoubzadeh Torky
committed
Message::ptr iu = Message::ptr(new Message(category, IU_ACCESS_MESSAGE, true, (payload_type=="")?__ipaaca_static_option_default_payload_type:payload_type)); /* params */ //));
iu->_payload.initialize(iu);
return iu;
}
Message::Message(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
: IU(category, access_mode, read_only, payload_type)
{
}
void Message::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
if (is_published()) {
IPAACA_INFO("Info: modifying a Message after sending has no global effects")
}
}
void Message::_modify_payload(bool is_delta, const std::map<std::string, PayloadDocumentEntry::ptr>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
{
if (is_published()) {
IPAACA_INFO("Info: modifying a Message after sending has no global effects")
}
}
void Message::_internal_commit(const std::string& writer_name)
{
if (is_published()) {
IPAACA_INFO("Info: committing to a Message after sending has no global effects")
}
}
//}}}
// RemotePushIU//{{{
IPAACA_EXPORT RemotePushIU::ptr RemotePushIU::create()
{
RemotePushIU::ptr iu = RemotePushIU::ptr(new RemotePushIU(/* params */));
iu->_payload.initialize(iu);
return iu;
}
IPAACA_EXPORT RemotePushIU::RemotePushIU()
{
// nothing
}
IPAACA_EXPORT void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
if (_committed) {
throw IUCommittedError();
}
if (_read_only) {
throw IUReadOnlyError();
}
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
IULinkUpdate::ptr update = IULinkUpdate::ptr(new IULinkUpdate());
update->uid = _uid;
update->revision = _revision;
update->is_delta = is_delta;
update->writer_name = _buffer->unique_name();
update->new_links = new_links;
update->links_to_remove = links_to_remove;
boost::shared_ptr<int> result = server->call<int>("updateLinks", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO
if (*result == 0) {
throw IUUpdateFailedError();
} else {
_revision = *result;
}
}
IPAACA_EXPORT void RemotePushIU::_modify_payload(bool is_delta, const std::map<std::string, PayloadDocumentEntry::ptr>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
{
//std::cout << "-- Sending a modify_payload with " << new_items.size() << " keys to merge." << std::endl;
if (_committed) {
throw IUCommittedError();
}
if (_read_only) {
throw IUReadOnlyError();
}
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
IUPayloadUpdate::ptr update = IUPayloadUpdate::ptr(new IUPayloadUpdate());
update->uid = _uid;
update->revision = _revision;
update->is_delta = is_delta;
update->writer_name = _buffer->unique_name();
update->new_items = new_items;
update->keys_to_remove = keys_to_remove;
update->payload_type = _payload_type;
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
boost::shared_ptr<int> result = server->call<int>("updatePayload", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO
if (*result == 0) {
throw IUUpdateFailedError();
} else {
_revision = *result;
}
}
IPAACA_EXPORT void RemotePushIU::commit()
{
if (_read_only) {
throw IUReadOnlyError();
}
if (_committed) {
// Following python version: ignoring multiple commit
return;
}
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
boost::shared_ptr<protobuf::IUCommission> update = boost::shared_ptr<protobuf::IUCommission>(new protobuf::IUCommission());
update->set_uid(_uid);
update->set_revision(_revision);
update->set_writer_name(_buffer->unique_name());
boost::shared_ptr<int> result = server->call<int>("commit", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO
if (*result == 0) {
throw IUUpdateFailedError();
} else {
_revision = *result;
}
}
IPAACA_EXPORT void RemotePushIU::_apply_link_update(IULinkUpdate::ptr update)
{
_revision = update->revision;
if (update->is_delta) {
_add_and_remove_links(update->new_links, update->links_to_remove);
} else {
_replace_links(update->new_links);
}
}
IPAACA_EXPORT void RemotePushIU::_apply_update(IUPayloadUpdate::ptr update)
{
_revision = update->revision;
if (update->is_delta) {
for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) {
_payload._remotely_enforced_delitem(*it);
}
for (std::map<std::string, PayloadDocumentEntry::ptr>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
_payload._remotely_enforced_setitem(it->first, it->second);
}
} else {
_payload._remotely_enforced_wipe();
for (std::map<std::string, PayloadDocumentEntry::ptr>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
_payload._remotely_enforced_setitem(it->first, it->second);
}
}
}
IPAACA_EXPORT void RemotePushIU::_apply_commission()
{
_committed = true;
}
IPAACA_EXPORT void RemotePushIU::_apply_retraction()
{
_retracted = true;
}
//}}}
// RemoteMessage//{{{
IPAACA_EXPORT RemoteMessage::ptr RemoteMessage::create()
{
RemoteMessage::ptr iu = RemoteMessage::ptr(new RemoteMessage(/* params */));
iu->_payload.initialize(iu);
return iu;
}
IPAACA_EXPORT RemoteMessage::RemoteMessage()
{
// nothing
}
IPAACA_EXPORT void RemoteMessage::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
IPAACA_INFO("Info: modifying a RemoteMessage only has local effects")
}
IPAACA_EXPORT void RemoteMessage::_modify_payload(bool is_delta, const std::map<std::string, PayloadDocumentEntry::ptr>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
{
IPAACA_INFO("Info: modifying a RemoteMessage only has local effects")
}
IPAACA_EXPORT void RemoteMessage::commit()
{
IPAACA_INFO("Info: committing to a RemoteMessage only has local effects")
}
IPAACA_EXPORT void RemoteMessage::_apply_link_update(IULinkUpdate::ptr update)
{
IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_link_update")
_revision = update->revision;
if (update->is_delta) {
_add_and_remove_links(update->new_links, update->links_to_remove);
} else {
_replace_links(update->new_links);
}
}
IPAACA_EXPORT void RemoteMessage::_apply_update(IUPayloadUpdate::ptr update)
{
IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_update")
_revision = update->revision;
if (update->is_delta) {
for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) {
_payload._remotely_enforced_delitem(*it);
}
for (std::map<std::string, PayloadDocumentEntry::ptr>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
_payload._remotely_enforced_setitem(it->first, it->second);
}
} else {
_payload._remotely_enforced_wipe();
for (std::map<std::string, PayloadDocumentEntry::ptr>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
_payload._remotely_enforced_setitem(it->first, it->second);
}
}
}
IPAACA_EXPORT void RemoteMessage::_apply_commission()
{
IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_commission")
_committed = true;
}
IPAACA_EXPORT void RemoteMessage::_apply_retraction()
{
IPAACA_WARNING("Warning: should never be called: RemoteMessage::_apply_retraction")
_retracted = true;
}
//}}}
} // of namespace ipaaca