Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • scs/ipaaca
  • ramin.yaghoubzadeh/ipaaca
2 results
Show changes
Showing
with 810 additions and 675 deletions
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
#include <ipaaca/ipaaca.h>
namespace ipaaca {
namespace converters {
// Wrap a serialized inner object and a wire type in a protobuf::TransportLevelWrapper
std::string cooked_message(const std::string& raw_message, ipaaca::protobuf::TransportMessageType msg_type)
{
std::string cooked_msg;
std::shared_ptr<protobuf::TransportLevelWrapper> pbo(new protobuf::TransportLevelWrapper());
pbo->set_raw_message(raw_message);
pbo->set_transport_message_type(msg_type);
pbo->SerializeToString(&cooked_msg);
return cooked_msg;
};
// protobuf serialization for all supported types (replaces converter repository)
std::string internal_serialize(ipaaca::IU::ptr iu) {
std::string raw_message = IUConverter::serialize(iu);
return cooked_message(raw_message,
(iu->access_mode()==IU_ACCESS_MESSAGE) ? protobuf::TransportMessageType::WireTypeMessageIU
: protobuf::TransportMessageType::WireTypeIU);
};
/*
std::string internal_serialize(ipaaca::Message::ptr msg) {
std::string raw_message = MessageConverter::serialize(msg);
return cooked_message(raw_message, protobuf::TransportMessageType::WireTypeMessageIU);
};
*/
std::string internal_serialize(ipaaca::IUPayloadUpdate::ptr pup) {
std::string raw_message = IUPayloadUpdateConverter::serialize(pup);
return cooked_message(raw_message, protobuf::TransportMessageType::WireTypeIUPayloadUpdate);
};
std::string internal_serialize(ipaaca::IULinkUpdate::ptr lup) {
std::string raw_message = IULinkUpdateConverter::serialize(lup);
return cooked_message(raw_message, protobuf::TransportMessageType::WireTypeIULinkUpdate);
};
std::string internal_serialize(std::shared_ptr<protobuf::RemoteRequestResult> pb) {
std::string raw_message;
pb->SerializeToString(&raw_message);
return cooked_message(raw_message, protobuf::TransportMessageType::WireTypeRemoteRequestResult);
};
std::string internal_serialize(std::shared_ptr<protobuf::IURetraction> pb) {
std::string raw_message;
pb->SerializeToString(&raw_message);
return cooked_message(raw_message, protobuf::TransportMessageType::WireTypeIURetraction);
};
std::string internal_serialize(std::shared_ptr<protobuf::IUCommission> pb) {
std::string raw_message;
pb->SerializeToString(&raw_message);
return cooked_message(raw_message, protobuf::TransportMessageType::WireTypeIUCommission);
};
std::string internal_serialize(std::shared_ptr<protobuf::IUResendRequest> pb) {
std::string raw_message;
pb->SerializeToString(&raw_message);
return cooked_message(raw_message, protobuf::TransportMessageType::WireTypeIUResendRequest);
};
std::string internal_serialize(std::shared_ptr<protobuf::IUPayloadUpdateRequest> pb) {
std::string raw_message;
pb->SerializeToString(&raw_message);
return cooked_message(raw_message, protobuf::TransportMessageType::WireTypeIUPayloadUpdateRequest);
};
std::string internal_serialize(std::shared_ptr<protobuf::IULinkUpdateRequest> pb) {
std::string raw_message;
pb->SerializeToString(&raw_message);
return cooked_message(raw_message, protobuf::TransportMessageType::WireTypeIULinkUpdateRequest);
};
std::string internal_serialize(std::shared_ptr<protobuf::IUCommissionRequest> pb) {
std::string raw_message;
pb->SerializeToString(&raw_message);
return cooked_message(raw_message, protobuf::TransportMessageType::WireTypeIUCommissionRequest);
};
// deserialization (just switching here instead of the converter registry business)
ipaaca::backend::Event::ptr internal_deserialize(const std::string& wire)
{
std::shared_ptr<protobuf::TransportLevelWrapper> pbo(new protobuf::TransportLevelWrapper());
pbo->ParseFromString(wire);
std::shared_ptr<ipaaca::backend::Event> event;
//std::cout << "internal_deserialize of TransportMessageType " << pbo->transport_message_type() << std::endl;
switch (pbo->transport_message_type()) {
case protobuf::TransportMessageType::WireTypeIU:
{ event = std::make_shared<ipaaca::backend::Event>("ipaaca::RemotePushIU", std::static_pointer_cast<RemotePushIU>(IUConverter::deserialize(pbo->raw_message()))); }
break;
case protobuf::TransportMessageType::WireTypeMessageIU:
{ event = std::make_shared<ipaaca::backend::Event>("ipaaca::RemoteMessage", std::static_pointer_cast<RemoteMessage>(IUConverter::deserialize(pbo->raw_message()))); }
break;
case protobuf::TransportMessageType::WireTypeIUPayloadUpdate:
{ event = std::make_shared<ipaaca::backend::Event>("ipaaca::IUPayloadUpdate", IUPayloadUpdateConverter::deserialize(pbo->raw_message())); }
break;
case protobuf::TransportMessageType::WireTypeIULinkUpdate:
{ event = std::make_shared<ipaaca::backend::Event>("ipaaca::IULinkUpdate", IULinkUpdateConverter::deserialize(pbo->raw_message())); }
break;
case protobuf::TransportMessageType::WireTypeIURetraction:
{
std::shared_ptr<protobuf::IURetraction> inner(new protobuf::IURetraction());
inner->ParseFromString(pbo->raw_message());
event = std::make_shared<ipaaca::backend::Event>("ipaaca::protobuf::IURetraction", inner);
}
break;
case protobuf::TransportMessageType::WireTypeIUCommission:
{
std::shared_ptr<protobuf::IUCommission> inner(new protobuf::IUCommission());
inner->ParseFromString(pbo->raw_message());
event = std::make_shared<ipaaca::backend::Event>("ipaaca::protobuf::IUCommission", inner);
}
break;
case protobuf::TransportMessageType::WireTypeRemoteRequestResult:
{
std::shared_ptr<protobuf::RemoteRequestResult> inner(new protobuf::RemoteRequestResult());
inner->ParseFromString(pbo->raw_message());
event = std::make_shared<ipaaca::backend::Event>("ipaaca::protobuf::RemoteRequestResult", inner);
}
break;
case protobuf::TransportMessageType::WireTypeIUResendRequest:
{
std::shared_ptr<protobuf::IUResendRequest> inner(new protobuf::IUResendRequest());
inner->ParseFromString(pbo->raw_message());
event = std::make_shared<ipaaca::backend::Event>("ipaaca::protobuf::IUResendRequest", inner);
}
break;
case protobuf::TransportMessageType::WireTypeIUPayloadUpdateRequest:
{
std::shared_ptr<protobuf::IUPayloadUpdateRequest> inner(new protobuf::IUPayloadUpdateRequest());
inner->ParseFromString(pbo->raw_message());
event = std::make_shared<ipaaca::backend::Event>("ipaaca::protobuf::IUPayloadUpdateRequest", inner);
}
break;
case protobuf::TransportMessageType::WireTypeIULinkUpdateRequest:
{
std::shared_ptr<protobuf::IULinkUpdateRequest> inner(new protobuf::IULinkUpdateRequest());
inner->ParseFromString(pbo->raw_message());
event = std::make_shared<ipaaca::backend::Event>("ipaaca::protobuf::IULinkUpdateRequest", inner);
}
break;
case protobuf::TransportMessageType::WireTypeIUCommissionRequest:
{
std::shared_ptr<protobuf::IUCommissionRequest> inner(new protobuf::IUCommissionRequest());
inner->ParseFromString(pbo->raw_message());
event = std::make_shared<ipaaca::backend::Event>("ipaaca::protobuf::IUCommissionRequest", inner);
}
break;
default:
throw ipaaca::UnhandledWireTypeError(pbo->transport_message_type());
};
return event;
}
// RSB backend Converters
// IUConverter//{{{
IPAACA_EXPORT std::string IUConverter::serialize(ipaaca::IU::ptr obj)
{
std::string wire;
std::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
// transfer obj data to pbo
pbo->set_uid(obj->uid());
pbo->set_revision(obj->revision());
pbo->set_category(obj->category());
pbo->set_payload_type(obj->payload_type());
pbo->set_owner_name(obj->owner_name());
pbo->set_committed(obj->committed());
ipaaca::protobuf::IU_AccessMode a_m;
switch(obj->access_mode()) {
case IU_ACCESS_PUSH:
a_m = ipaaca::protobuf::IU_AccessMode_PUSH;
break;
case IU_ACCESS_REMOTE:
a_m = ipaaca::protobuf::IU_AccessMode_REMOTE;
break;
case IU_ACCESS_MESSAGE:
a_m = ipaaca::protobuf::IU_AccessMode_MESSAGE;
break;
}
pbo->set_access_mode(a_m);
pbo->set_read_only(obj->read_only());
for (auto& kv: obj->_payload._document_store) {
protobuf::PayloadItem* item = pbo->add_payload();
item->set_key(kv.first);
IPAACA_DEBUG("Payload type: " << obj->_payload_type)
if (obj->_payload_type=="JSON") {
item->set_value( kv.second->to_json_string_representation() );
item->set_type("JSON");
} else if ((obj->_payload_type=="MAP") || (obj->_payload_type=="STR")) {
// legacy mode
item->set_value( json_value_cast<std::string>(kv.second->document));
item->set_type("STR");
}
}
for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) {
protobuf::LinkSet* links = pbo->add_links();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
pbo->SerializeToString(&wire);
return wire;
}
IPAACA_EXPORT ipaaca::IUInterface::ptr IUConverter::deserialize(const std::string& wire) {
//assert(wireSchema == getWireSchema()); // "ipaaca-iu"
std::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
pbo->ParseFromString(wire);
IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode());
ipaaca::IUInterface::ptr obj;
switch(mode) {
case IU_ACCESS_PUSH:
{
// Create a "remote push IU"
auto inst = RemotePushIU::create();
inst->_access_mode = IU_ACCESS_PUSH;
obj = inst;
for (int i=0; i<pbo->payload_size(); i++) {
const protobuf::PayloadItem& it = pbo->payload(i);
PayloadDocumentEntry::ptr entry;
if (it.type() == "JSON") {
// fully parse json text
entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
} else {
// assuming legacy "str" -> just copy value to raw string in document
entry = std::make_shared<PayloadDocumentEntry>();
entry->document.SetString(it.value(), entry->document.GetAllocator());
}
inst->_payload._document_store[it.key()] = entry;
}
}
break;
case IU_ACCESS_MESSAGE:
{
auto inst = RemoteMessage::create();
inst->_access_mode = IU_ACCESS_MESSAGE;
obj = inst;
for (int i=0; i<pbo->payload_size(); i++) {
const protobuf::PayloadItem& it = pbo->payload(i);
PayloadDocumentEntry::ptr entry;
if (it.type() == "JSON") {
// fully parse json text
entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
} else {
// assuming legacy "str" -> just copy value to raw string in document
entry = std::make_shared<PayloadDocumentEntry>();
entry->document.SetString(it.value(), entry->document.GetAllocator());
}
inst->_payload._document_store[it.key()] = entry;
}
}
break;
default:
throw NotImplementedError();
}
// transfer pbo data to obj
obj->_uid = pbo->uid();
obj->_revision = pbo->revision();
obj->_category = pbo->category();
obj->_payload_type = pbo->payload_type();
obj->_owner_name = pbo->owner_name();
obj->_committed = pbo->committed();
obj->_read_only = pbo->read_only();
for (int i=0; i<pbo->links_size(); i++) {
const protobuf::LinkSet& pls = pbo->links(i);
LinkSet& ls = obj->_links._links[pls.type()];
for (int j=0; j<pls.targets_size(); j++) {
ls.insert(pls.targets(j));
}
}
return obj;
}
//}}}
// IUPayloadUpdateConverter//{{{
IPAACA_EXPORT std::string IUPayloadUpdateConverter::serialize(ipaaca::IUPayloadUpdate::ptr obj)
{
std::string wire;
//assert(data.first == getDataType()); // "ipaaca::IUPayloadUpdate"
std::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
// transfer obj data to pbo
pbo->set_uid(obj->uid);
pbo->set_revision(obj->revision);
pbo->set_writer_name(obj->writer_name);
pbo->set_is_delta(obj->is_delta);
pbo->set_request_uid(obj->request_uid);
pbo->set_request_endpoint(obj->request_endpoint);
for (auto& kv: obj->new_items) {
protobuf::PayloadItem* item = pbo->add_new_items();
item->set_key(kv.first);
if (obj->payload_type=="JSON") {
item->set_value( kv.second->to_json_string_representation() );
item->set_type("JSON");
} else if ((obj->payload_type=="MAP") || (obj->payload_type=="STR")) {
// legacy mode
item->set_value( json_value_cast<std::string>(kv.second->document));
item->set_type("STR");
} else {
IPAACA_ERROR("Uninitialized payload update type!")
throw NotImplementedError();
}
IPAACA_DEBUG("Adding updated item (type " << item->type() << "): " << item->key() << " -> " << item->value() )
}
for (auto& key: obj->keys_to_remove) {
pbo->add_keys_to_remove(key);
IPAACA_DEBUG("Adding removed key: " << key)
}
pbo->SerializeToString(&wire);
return wire;
}
ipaaca::IUPayloadUpdate::ptr IUPayloadUpdateConverter::deserialize(const std::string& wire) {
//assert(wireSchema == getWireSchema()); // "ipaaca-iu-payload-update"
std::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
pbo->ParseFromString(wire);
std::shared_ptr<IUPayloadUpdate> obj(new IUPayloadUpdate());
// transfer pbo data to obj
obj->uid = pbo->uid();
obj->revision = pbo->revision();
obj->writer_name = pbo->writer_name();
obj->is_delta = pbo->is_delta();
obj->request_uid = pbo->request_uid();
obj->request_endpoint = pbo->request_endpoint();
for (int i=0; i<pbo->new_items_size(); i++) {
const protobuf::PayloadItem& it = pbo->new_items(i);
PayloadDocumentEntry::ptr entry;
if (it.type() == "JSON") {
// fully parse json text
entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
IPAACA_DEBUG("New/updated payload entry: " << it.key() << " -> " << it.value() )
} else {
// assuming legacy "str" -> just copy value to raw string in document
entry = std::make_shared<PayloadDocumentEntry>();
entry->document.SetString(it.value(), entry->document.GetAllocator());
}
obj->new_items[it.key()] = entry;
}
for (int i=0; i<pbo->keys_to_remove_size(); i++) {
obj->keys_to_remove.push_back(pbo->keys_to_remove(i));
}
return obj;
}
//}}}
// IULinkUpdateConverter//{{{
IPAACA_EXPORT std::string IULinkUpdateConverter::serialize(ipaaca::IULinkUpdate::ptr obj)
{
std::string wire;
//assert(data.first == getDataType());
std::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
// transfer obj data to pbo
pbo->set_uid(obj->uid);
pbo->set_revision(obj->revision);
pbo->set_writer_name(obj->writer_name);
pbo->set_is_delta(obj->is_delta);
pbo->set_request_uid(obj->request_uid);
pbo->set_request_endpoint(obj->request_endpoint);
for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->new_links.begin(); it!=obj->new_links.end(); ++it) {
protobuf::LinkSet* links = pbo->add_new_links();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->links_to_remove.begin(); it!=obj->links_to_remove.end(); ++it) {
protobuf::LinkSet* links = pbo->add_links_to_remove();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
pbo->SerializeToString(&wire);
return wire;
}
ipaaca::IULinkUpdate::ptr IULinkUpdateConverter::deserialize(const std::string& wire) {
//assert(wireSchema == getWireSchema()); // "ipaaca-iu-link-update"
std::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
pbo->ParseFromString(wire);
std::shared_ptr<IULinkUpdate> obj(new IULinkUpdate());
// transfer pbo data to obj
obj->uid = pbo->uid();
obj->revision = pbo->revision();
obj->writer_name = pbo->writer_name();
obj->is_delta = pbo->is_delta();
obj->request_uid = pbo->request_uid();
obj->request_endpoint = pbo->request_endpoint();
for (int i=0; i<pbo->new_links_size(); ++i) {
const protobuf::LinkSet& it = pbo->new_links(i);
for (int j=0; j<it.targets_size(); ++j) {
obj->new_links[it.type()].insert(it.targets(j)); // = vec;
}
}
for (int i=0; i<pbo->links_to_remove_size(); ++i) {
const protobuf::LinkSet& it = pbo->links_to_remove(i);
for (int j=0; j<it.targets_size(); ++j) {
obj->links_to_remove[it.type()].insert(it.targets(j));
}
}
return obj;
}
//}}}
} // namespace converters
} // namespace ipaaca
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2015 Social Cognitive Systems Group * Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group) * (formerly the Sociable Agents Group)
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
...@@ -39,10 +39,10 @@ namespace ipaaca { ...@@ -39,10 +39,10 @@ namespace ipaaca {
IPAACA_EXPORT inline FakeIU::FakeIU() { IPAACA_EXPORT inline FakeIU::FakeIU() {
IPAACA_INFO("") IPAACA_INFO("")
} }
IPAACA_EXPORT boost::shared_ptr<FakeIU> FakeIU::create() IPAACA_EXPORT std::shared_ptr<FakeIU> FakeIU::create()
{ {
IPAACA_INFO(""); IPAACA_INFO("");
auto iu = boost::shared_ptr<FakeIU>(new FakeIU()); auto iu = std::shared_ptr<FakeIU>(new FakeIU());
iu->_payload.initialize(iu); iu->_payload.initialize(iu);
return iu; return iu;
} }
......
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
#include <ipaaca/ipaaca.h>
namespace ipaaca {
// static library Initializer
IPAACA_EXPORT bool Initializer::_initialized = false;
IPAACA_EXPORT bool Initializer::initialized() { return _initialized; }
IPAACA_EXPORT void Initializer::initialize_ipaaca_rsb_if_needed()
{
initialize_backend();
}
IPAACA_EXPORT void Initializer::initialize_backend()//{{{
{
if (_initialized) return;
override_env_with_cmdline_set_variables();
_initialized = true;
}//}}}
IPAACA_EXPORT void Initializer::dump_current_default_config()//{{{
{
IPAACA_INFO("--- Dumping current global configuration ---")
auto cfg = ipaaca::get_global_config();
for (auto it = cfg->data_cbegin(); it != cfg->data_cend(); ++it) {
IPAACA_INFO("--- " << it->first << " = \"" << it->second << "\"")
}
IPAACA_INFO("-------------- End of config ---------------")
}//}}}
IPAACA_EXPORT void Initializer::override_env_with_cmdline_set_variables()//{{{
{
// set RSB host and port iff provided using cmdline arguments
if (__ipaaca_static_option_rsb_host!="") {
IPAACA_INFO("Overriding RSB host with " << __ipaaca_static_option_rsb_host)
IPAACA_SETENV("RSB_TRANSPORT_SPREAD_HOST", __ipaaca_static_option_rsb_host.c_str())
IPAACA_SETENV("RSB_TRANSPORT_SOCKET_HOST", __ipaaca_static_option_rsb_host.c_str());
}
if (__ipaaca_static_option_rsb_port!="") {
IPAACA_INFO("Overriding RSB port with " << __ipaaca_static_option_rsb_port)
IPAACA_SETENV("RSB_TRANSPORT_SPREAD_PORT", __ipaaca_static_option_rsb_port.c_str());
IPAACA_SETENV("RSB_TRANSPORT_SOCKET_PORT", __ipaaca_static_option_rsb_port.c_str());
}
if (__ipaaca_static_option_rsb_transport!="") {
if (__ipaaca_static_option_rsb_transport == "spread") {
IPAACA_INFO("Overriding RSB transport mode - using 'spread' ")
IPAACA_SETENV("RSB_TRANSPORT_SPREAD_ENABLED", "1");
IPAACA_SETENV("RSB_TRANSPORT_SOCKET_ENABLED", "0");
} else if (__ipaaca_static_option_rsb_transport == "socket") {
IPAACA_INFO("Overriding RSB transport mode - using 'socket' ")
IPAACA_SETENV("RSB_TRANSPORT_SPREAD_ENABLED", "0");
IPAACA_SETENV("RSB_TRANSPORT_SOCKET_ENABLED", "1");
if (__ipaaca_static_option_rsb_socketserver!="") {
const std::string& srv = __ipaaca_static_option_rsb_socketserver;
if ((srv=="1")||(srv=="0")||(srv=="auto")) {
IPAACA_INFO("Overriding RSB transport.socket.server with " << srv)
IPAACA_SETENV("RSB_TRANSPORT_SOCKET_SERVER", srv.c_str());
} else {
IPAACA_INFO("Unknown RSB transport.socket.server mode " << srv << " - using config default ")
}
}
} else {
IPAACA_INFO("Unknown RSB transport mode " << __ipaaca_static_option_rsb_transport << " - using config default ")
}
}
}//}}}
} // of namespace ipaaca
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
#include <ipaaca/ipaaca.h>
namespace ipaaca {
using namespace rsb;
using namespace rsb::filter;
using namespace rsb::converter;
using namespace rsb::patterns;
// static library Initializer
IPAACA_EXPORT bool Initializer::_initialized = false;
IPAACA_EXPORT bool Initializer::initialized() { return _initialized; }
IPAACA_EXPORT void Initializer::initialize_ipaaca_rsb_if_needed()
{
initialize_backend();
}
IPAACA_EXPORT void Initializer::initialize_backend()//{{{
{
if (_initialized) return;
auto_configure_rsb();
IPAACA_DEBUG("Creating and registering Converters")
boost::shared_ptr<IUConverter> iu_converter(new IUConverter());
converterRepository<std::string>()->registerConverter(iu_converter);
boost::shared_ptr<MessageConverter> message_converter(new MessageConverter());
converterRepository<std::string>()->registerConverter(message_converter);
boost::shared_ptr<IUPayloadUpdateConverter> payload_update_converter(new IUPayloadUpdateConverter());
converterRepository<std::string>()->registerConverter(payload_update_converter);
boost::shared_ptr<IULinkUpdateConverter> link_update_converter(new IULinkUpdateConverter());
converterRepository<std::string>()->registerConverter(link_update_converter);
boost::shared_ptr<ProtocolBufferConverter<protobuf::IUCommission> > iu_commission_converter(new ProtocolBufferConverter<protobuf::IUCommission> ());
converterRepository<std::string>()->registerConverter(iu_commission_converter);
// dlw
boost::shared_ptr<ProtocolBufferConverter<protobuf::IUResendRequest> > iu_resendrequest_converter(new ProtocolBufferConverter<protobuf::IUResendRequest> ());
converterRepository<std::string>()->registerConverter(iu_resendrequest_converter);
boost::shared_ptr<ProtocolBufferConverter<protobuf::IURetraction> > iu_retraction_converter(new ProtocolBufferConverter<protobuf::IURetraction> ());
converterRepository<std::string>()->registerConverter(iu_retraction_converter);
boost::shared_ptr<IntConverter> int_converter(new IntConverter());
converterRepository<std::string>()->registerConverter(int_converter);
IPAACA_DEBUG("Backend / converter initialization complete.")
_initialized = true;
}//}}}
IPAACA_EXPORT void Initializer::dump_current_default_config()//{{{
{
IPAACA_INFO("--- Dumping current default participant configuration ---")
rsb::ParticipantConfig config = getFactory().getDefaultParticipantConfig();
std::set<rsb::ParticipantConfig::Transport> transports = config.getTransports();
for (std::set<rsb::ParticipantConfig::Transport>::const_iterator it=transports.begin(); it!=transports.end(); ++it) {
IPAACA_INFO( "Active transport: " << it->getName() )
}
IPAACA_INFO("--- End of configuration dump ---")
//ParticipantConfig::Transport inprocess = config.getTransport("inprocess");
//inprocess.setEnabled(true);
//config.addTransport(inprocess);
}//}}}
IPAACA_EXPORT void Initializer::auto_configure_rsb()//{{{
{
const char* plugin_path = getenv("RSB_PLUGINS_CPP_PATH");
if (!plugin_path) {
#ifdef WIN32
IPAACA_WARN("WARNING: RSB_PLUGINS_CPP_PATH not set - in Windows it has to be specified.")
//throw NotImplementedError();
#else
IPAACA_INFO("RSB_PLUGINS_CPP_PATH not set; looking here and up to 7 dirs up.")
std::string pathstr = "./";
for (int i=0; i< 8 /* depth EIGHT (totally arbitrary..) */ ; i++) {
std::string where_str = pathstr+"deps/lib/rsb*/plugins";
const char* where = where_str.c_str();
glob_t g;
glob(where, 0, NULL, &g);
if (g.gl_pathc>0) {
const char* found_path = g.gl_pathv[0];
IPAACA_INFO("Found an RSB plugin dir which will be used automatically: " << found_path)
setenv("RSB_PLUGINS_CPP_PATH", found_path, 1);
break;
} // else keep going
globfree(&g);
pathstr += "../";
}
#endif
} else {
IPAACA_INFO("RSB_PLUGINS_CPP_PATH already defined: " << plugin_path)
}
}//}}}
// RSB backend Converters
// IUConverter//{{{
IPAACA_EXPORT IUConverter::IUConverter()
: Converter<std::string> (IPAACA_SYSTEM_DEPENDENT_CLASS_NAME("ipaaca::IU"), "ipaaca-iu", true)
{
}
IPAACA_EXPORT std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire)
{
assert(data.first == getDataType()); // "ipaaca::IU"
boost::shared_ptr<const IU> obj = boost::static_pointer_cast<const IU> (data.second);
boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
// transfer obj data to pbo
pbo->set_uid(obj->uid());
pbo->set_revision(obj->revision());
pbo->set_category(obj->category());
pbo->set_payload_type(obj->payload_type());
pbo->set_owner_name(obj->owner_name());
pbo->set_committed(obj->committed());
ipaaca::protobuf::IU_AccessMode a_m;
switch(obj->access_mode()) {
case IU_ACCESS_PUSH:
a_m = ipaaca::protobuf::IU_AccessMode_PUSH;
break;
case IU_ACCESS_REMOTE:
a_m = ipaaca::protobuf::IU_AccessMode_REMOTE;
break;
case IU_ACCESS_MESSAGE:
a_m = ipaaca::protobuf::IU_AccessMode_MESSAGE;
break;
}
pbo->set_access_mode(a_m);
pbo->set_read_only(obj->read_only());
for (auto& kv: obj->_payload._document_store) {
protobuf::PayloadItem* item = pbo->add_payload();
item->set_key(kv.first);
IPAACA_DEBUG("Payload type: " << obj->_payload_type)
if (obj->_payload_type=="JSON") {
item->set_value( kv.second->to_json_string_representation() );
item->set_type("JSON");
} else if ((obj->_payload_type=="MAP") || (obj->_payload_type=="STR")) {
// legacy mode
item->set_value( json_value_cast<std::string>(kv.second->document));
item->set_type("STR");
}
}
for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) {
protobuf::LinkSet* links = pbo->add_links();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
pbo->SerializeToString(&wire);
switch(obj->access_mode()) {
case IU_ACCESS_PUSH:
return "ipaaca-iu";
case IU_ACCESS_MESSAGE:
return "ipaaca-messageiu";
default:
return getWireSchema();
}
}
IPAACA_EXPORT AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "ipaaca-iu"
boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
pbo->ParseFromString(wire);
IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode());
switch(mode) {
case IU_ACCESS_PUSH:
{
// Create a "remote push IU"
boost::shared_ptr<RemotePushIU> obj = RemotePushIU::create();
// transfer pbo data to obj
obj->_uid = pbo->uid();
obj->_revision = pbo->revision();
obj->_category = pbo->category();
obj->_payload_type = pbo->payload_type();
obj->_owner_name = pbo->owner_name();
obj->_committed = pbo->committed();
obj->_read_only = pbo->read_only();
obj->_access_mode = IU_ACCESS_PUSH;
for (int i=0; i<pbo->payload_size(); i++) {
const protobuf::PayloadItem& it = pbo->payload(i);
PayloadDocumentEntry::ptr entry;
if (it.type() == "JSON") {
// fully parse json text
entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
} else {
// assuming legacy "str" -> just copy value to raw string in document
entry = std::make_shared<PayloadDocumentEntry>();
entry->document.SetString(it.value(), entry->document.GetAllocator());
}
obj->_payload._document_store[it.key()] = entry;
}
for (int i=0; i<pbo->links_size(); i++) {
const protobuf::LinkSet& pls = pbo->links(i);
LinkSet& ls = obj->_links._links[pls.type()];
for (int j=0; j<pls.targets_size(); j++) {
ls.insert(pls.targets(j));
}
}
return std::make_pair("ipaaca::RemotePushIU", obj);
break;
}
case IU_ACCESS_MESSAGE:
{
// Create a "Message-type IU"
boost::shared_ptr<RemoteMessage> obj = RemoteMessage::create();
//std::cout << "REFCNT after create: " << obj.use_count() << std::endl;
// transfer pbo data to obj
obj->_uid = pbo->uid();
obj->_revision = pbo->revision();
obj->_category = pbo->category();
obj->_payload_type = pbo->payload_type();
obj->_owner_name = pbo->owner_name();
obj->_committed = pbo->committed();
obj->_read_only = pbo->read_only();
obj->_access_mode = IU_ACCESS_MESSAGE;
for (int i=0; i<pbo->payload_size(); i++) {
const protobuf::PayloadItem& it = pbo->payload(i);
PayloadDocumentEntry::ptr entry;
if (it.type() == "JSON") {
// fully parse json text
entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
} else {
// assuming legacy "str" -> just copy value to raw string in document
entry = std::make_shared<PayloadDocumentEntry>();
entry->document.SetString(it.value(), entry->document.GetAllocator());
}
obj->_payload._document_store[it.key()] = entry;
}
for (int i=0; i<pbo->links_size(); i++) {
const protobuf::LinkSet& pls = pbo->links(i);
LinkSet& ls = obj->_links._links[pls.type()];
for (int j=0; j<pls.targets_size(); j++) {
ls.insert(pls.targets(j));
}
}
return std::make_pair("ipaaca::RemoteMessage", obj);
break;
}
default:
// no other cases (yet)
throw NotImplementedError();
}
}
//}}}
// MessageConverter//{{{
IPAACA_EXPORT MessageConverter::MessageConverter()
: Converter<std::string> (IPAACA_SYSTEM_DEPENDENT_CLASS_NAME("ipaaca::Message"), "ipaaca-messageiu", true)
{
}
IPAACA_EXPORT std::string MessageConverter::serialize(const AnnotatedData& data, std::string& wire)
{
assert(data.first == getDataType()); // "ipaaca::Message"
boost::shared_ptr<const Message> obj = boost::static_pointer_cast<const Message> (data.second);
boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
// transfer obj data to pbo
pbo->set_uid(obj->uid());
pbo->set_revision(obj->revision());
pbo->set_category(obj->category());
pbo->set_payload_type(obj->payload_type());
pbo->set_owner_name(obj->owner_name());
pbo->set_committed(obj->committed());
ipaaca::protobuf::IU_AccessMode a_m;
switch(obj->access_mode()) {
case IU_ACCESS_PUSH:
a_m = ipaaca::protobuf::IU_AccessMode_PUSH;
break;
case IU_ACCESS_REMOTE:
a_m = ipaaca::protobuf::IU_AccessMode_REMOTE;
break;
case IU_ACCESS_MESSAGE:
a_m = ipaaca::protobuf::IU_AccessMode_MESSAGE;
break;
}
pbo->set_access_mode(a_m);
pbo->set_read_only(obj->read_only());
for (auto& kv: obj->_payload._document_store) {
protobuf::PayloadItem* item = pbo->add_payload();
item->set_key(kv.first);
if (obj->_payload_type=="JSON") {
item->set_value( kv.second->to_json_string_representation() );
item->set_type("JSON");
} else if ((obj->_payload_type=="MAP") || (obj->_payload_type=="STR")) {
// legacy mode
item->set_value( json_value_cast<std::string>(kv.second->document));
item->set_type("STR");
}
}
for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) {
protobuf::LinkSet* links = pbo->add_links();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
pbo->SerializeToString(&wire);
switch(obj->access_mode()) {
case IU_ACCESS_PUSH:
return "ipaaca-iu";
case IU_ACCESS_MESSAGE:
return "ipaaca-messageiu";
default:
return getWireSchema();
}
}
IPAACA_EXPORT AnnotatedData MessageConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "ipaaca-iu"
boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
pbo->ParseFromString(wire);
IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode());
switch(mode) {
case IU_ACCESS_PUSH:
{
// Create a "remote push IU"
boost::shared_ptr<RemotePushIU> obj = RemotePushIU::create();
// transfer pbo data to obj
obj->_uid = pbo->uid();
obj->_revision = pbo->revision();
obj->_category = pbo->category();
obj->_payload_type = pbo->payload_type();
obj->_owner_name = pbo->owner_name();
obj->_committed = pbo->committed();
obj->_read_only = pbo->read_only();
obj->_access_mode = IU_ACCESS_PUSH;
for (int i=0; i<pbo->payload_size(); i++) {
const protobuf::PayloadItem& it = pbo->payload(i);
PayloadDocumentEntry::ptr entry;
if (it.type() == "JSON") {
// fully parse json text
entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
} else {
// assuming legacy "str" -> just copy value to raw string in document
entry = std::make_shared<PayloadDocumentEntry>();
entry->document.SetString(it.value(), entry->document.GetAllocator());
}
obj->_payload._document_store[it.key()] = entry;
}
for (int i=0; i<pbo->links_size(); i++) {
const protobuf::LinkSet& pls = pbo->links(i);
LinkSet& ls = obj->_links._links[pls.type()];
for (int j=0; j<pls.targets_size(); j++) {
ls.insert(pls.targets(j));
}
}
return std::make_pair("ipaaca::RemotePushIU", obj);
break;
}
case IU_ACCESS_MESSAGE:
{
// Create a "Message-type IU"
boost::shared_ptr<RemoteMessage> obj = RemoteMessage::create();
// transfer pbo data to obj
obj->_uid = pbo->uid();
obj->_revision = pbo->revision();
obj->_category = pbo->category();
obj->_payload_type = pbo->payload_type();
obj->_owner_name = pbo->owner_name();
obj->_committed = pbo->committed();
obj->_read_only = pbo->read_only();
obj->_access_mode = IU_ACCESS_MESSAGE;
for (int i=0; i<pbo->payload_size(); i++) {
const protobuf::PayloadItem& it = pbo->payload(i);
PayloadDocumentEntry::ptr entry;
if (it.type() == "JSON") {
// fully parse json text
entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
} else {
// assuming legacy "str" -> just copy value to raw string in document
entry = std::make_shared<PayloadDocumentEntry>();
entry->document.SetString(it.value(), entry->document.GetAllocator());
}
obj->_payload._document_store[it.key()] = entry;
}
for (int i=0; i<pbo->links_size(); i++) {
const protobuf::LinkSet& pls = pbo->links(i);
LinkSet& ls = obj->_links._links[pls.type()];
for (int j=0; j<pls.targets_size(); j++) {
ls.insert(pls.targets(j));
}
}
return std::make_pair("ipaaca::RemoteMessage", obj);
break;
}
default:
// no other cases (yet)
throw NotImplementedError();
}
}
//}}}
// IUPayloadUpdateConverter//{{{
IPAACA_EXPORT IUPayloadUpdateConverter::IUPayloadUpdateConverter()
: Converter<std::string> (IPAACA_SYSTEM_DEPENDENT_CLASS_NAME("ipaaca::IUPayloadUpdate"), "ipaaca-iu-payload-update", true)
{
}
IPAACA_EXPORT std::string IUPayloadUpdateConverter::serialize(const AnnotatedData& data, std::string& wire)
{
assert(data.first == getDataType()); // "ipaaca::IUPayloadUpdate"
boost::shared_ptr<const IUPayloadUpdate> obj = boost::static_pointer_cast<const IUPayloadUpdate> (data.second);
boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
// transfer obj data to pbo
pbo->set_uid(obj->uid);
pbo->set_revision(obj->revision);
pbo->set_writer_name(obj->writer_name);
pbo->set_is_delta(obj->is_delta);
for (auto& kv: obj->new_items) {
protobuf::PayloadItem* item = pbo->add_new_items();
item->set_key(kv.first);
if (obj->payload_type=="JSON") {
item->set_value( kv.second->to_json_string_representation() );
item->set_type("JSON");
} else if ((obj->payload_type=="MAP") || (obj->payload_type=="STR")) {
// legacy mode
item->set_value( json_value_cast<std::string>(kv.second->document));
item->set_type("STR");
} else {
IPAACA_ERROR("Uninitialized payload update type!")
throw NotImplementedError();
}
IPAACA_DEBUG("Adding updated item (type " << item->type() << "): " << item->key() << " -> " << item->value() )
}
for (auto& key: obj->keys_to_remove) {
pbo->add_keys_to_remove(key);
IPAACA_DEBUG("Adding removed key: " << key)
}
pbo->SerializeToString(&wire);
return getWireSchema();
}
AnnotatedData IUPayloadUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "ipaaca-iu-payload-update"
boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
pbo->ParseFromString(wire);
boost::shared_ptr<IUPayloadUpdate> obj(new IUPayloadUpdate());
// transfer pbo data to obj
obj->uid = pbo->uid();
obj->revision = pbo->revision();
obj->writer_name = pbo->writer_name();
obj->is_delta = pbo->is_delta();
for (int i=0; i<pbo->new_items_size(); i++) {
const protobuf::PayloadItem& it = pbo->new_items(i);
PayloadDocumentEntry::ptr entry;
if (it.type() == "JSON") {
// fully parse json text
entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
IPAACA_INFO("New/updated payload entry: " << it.key() << " -> " << it.value() )
} else {
// assuming legacy "str" -> just copy value to raw string in document
entry = std::make_shared<PayloadDocumentEntry>();
entry->document.SetString(it.value(), entry->document.GetAllocator());
}
obj->new_items[it.key()] = entry;
}
for (int i=0; i<pbo->keys_to_remove_size(); i++) {
obj->keys_to_remove.push_back(pbo->keys_to_remove(i));
}
return std::make_pair(getDataType(), obj);
}
//}}}
// IULinkUpdateConverter//{{{
IPAACA_EXPORT IULinkUpdateConverter::IULinkUpdateConverter()
: Converter<std::string> (IPAACA_SYSTEM_DEPENDENT_CLASS_NAME("ipaaca::IULinkUpdate"), "ipaaca-iu-link-update", true)
{
}
IPAACA_EXPORT std::string IULinkUpdateConverter::serialize(const AnnotatedData& data, std::string& wire)
{
assert(data.first == getDataType());
boost::shared_ptr<const IULinkUpdate> obj = boost::static_pointer_cast<const IULinkUpdate> (data.second);
boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
// transfer obj data to pbo
pbo->set_uid(obj->uid);
pbo->set_revision(obj->revision);
pbo->set_writer_name(obj->writer_name);
pbo->set_is_delta(obj->is_delta);
for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->new_links.begin(); it!=obj->new_links.end(); ++it) {
protobuf::LinkSet* links = pbo->add_new_links();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->links_to_remove.begin(); it!=obj->links_to_remove.end(); ++it) {
protobuf::LinkSet* links = pbo->add_links_to_remove();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
pbo->SerializeToString(&wire);
return getWireSchema();
}
AnnotatedData IULinkUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "ipaaca-iu-link-update"
boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
pbo->ParseFromString(wire);
boost::shared_ptr<IULinkUpdate> obj(new IULinkUpdate());
// transfer pbo data to obj
obj->uid = pbo->uid();
obj->revision = pbo->revision();
obj->writer_name = pbo->writer_name();
obj->is_delta = pbo->is_delta();
for (int i=0; i<pbo->new_links_size(); ++i) {
const protobuf::LinkSet& it = pbo->new_links(i);
for (int j=0; j<it.targets_size(); ++j) {
obj->new_links[it.type()].insert(it.targets(j)); // = vec;
}
}
for (int i=0; i<pbo->links_to_remove_size(); ++i) {
const protobuf::LinkSet& it = pbo->links_to_remove(i);
for (int j=0; j<it.targets_size(); ++j) {
obj->links_to_remove[it.type()].insert(it.targets(j));
}
}
return std::make_pair(getDataType(), obj);
}
//}}}
// IntConverter//{{{
IPAACA_EXPORT IntConverter::IntConverter()
: Converter<std::string> ("int", "int32", true)
{
}
IPAACA_EXPORT std::string IntConverter::serialize(const AnnotatedData& data, std::string& wire)
{
assert(data.first == getDataType());
// NOTE: a dynamic_pointer_cast cannot be used from void*
boost::shared_ptr<const int> obj = boost::static_pointer_cast<const int> (data.second);
boost::shared_ptr<protobuf::IntMessage> pbo(new protobuf::IntMessage());
// transfer obj data to pbo
pbo->set_value(*obj);
pbo->SerializeToString(&wire);
return getWireSchema();
}
IPAACA_EXPORT AnnotatedData IntConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "int"
boost::shared_ptr<protobuf::IntMessage> pbo(new protobuf::IntMessage());
pbo->ParseFromString(wire);
boost::shared_ptr<int> obj = boost::shared_ptr<int>(new int(pbo->value()));
return std::make_pair("int", obj);
}
//}}}
} // of namespace ipaaca
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2015 Social Cognitive Systems Group * Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group) * (formerly the Sociable Agents Group)
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
...@@ -35,11 +35,6 @@ ...@@ -35,11 +35,6 @@
namespace ipaaca { namespace ipaaca {
using namespace rsb;
using namespace rsb::filter;
using namespace rsb::converter;
using namespace rsb::patterns;
IPAACA_EXPORT std::ostream& operator<<(std::ostream& os, const IUInterface& obj)//{{{ IPAACA_EXPORT std::ostream& operator<<(std::ostream& os, const IUInterface& obj)//{{{
{ {
os << "IUInterface(uid='" << obj.uid() << "'"; os << "IUInterface(uid='" << obj.uid() << "'";
...@@ -142,7 +137,7 @@ IPAACA_EXPORT void IUInterface::set_links(const LinkMap& links, const std::strin ...@@ -142,7 +137,7 @@ IPAACA_EXPORT void IUInterface::set_links(const LinkMap& links, const std::strin
_replace_links(links); _replace_links(links);
} }
IPAACA_HEADER_EXPORT const std::string& IUInterface::channel() IPAACA_EXPORT const std::string& IUInterface::channel()
{ {
if (_buffer == NULL) if (_buffer == NULL)
throw IUUnpublishedError(); throw IUUnpublishedError();
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2015 Social Cognitive Systems Group * Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group) * (formerly the Sociable Agents Group)
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
...@@ -35,11 +35,6 @@ ...@@ -35,11 +35,6 @@
namespace ipaaca { namespace ipaaca {
using namespace rsb;
using namespace rsb::filter;
using namespace rsb::converter;
using namespace rsb::patterns;
// IU//{{{ // IU//{{{
IPAACA_EXPORT IU::ptr IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type) IPAACA_EXPORT IU::ptr IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
{ {
...@@ -84,7 +79,7 @@ IPAACA_EXPORT void IU::_modify_links(bool is_delta, const LinkMap& new_links, co ...@@ -84,7 +79,7 @@ IPAACA_EXPORT void IU::_modify_links(bool is_delta, const LinkMap& new_links, co
IPAACA_EXPORT void IU::_modify_payload(bool is_delta, const std::map<std::string, PayloadDocumentEntry::ptr>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) IPAACA_EXPORT void IU::_modify_payload(bool is_delta, const std::map<std::string, PayloadDocumentEntry::ptr>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
{ {
IPAACA_INFO("") //IPAACA_INFO("")
_revision_lock.lock(); _revision_lock.lock();
if (_committed) { if (_committed) {
_revision_lock.unlock(); _revision_lock.unlock();
...@@ -191,7 +186,7 @@ IPAACA_EXPORT void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new ...@@ -191,7 +186,7 @@ IPAACA_EXPORT void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new
} else if (_read_only) { } else if (_read_only) {
throw IUReadOnlyError(); throw IUReadOnlyError();
} }
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name); auto server = ((InputBuffer*)_buffer)->_get_remote_server(_owner_name);
IULinkUpdate::ptr update = IULinkUpdate::ptr(new IULinkUpdate()); IULinkUpdate::ptr update = IULinkUpdate::ptr(new IULinkUpdate());
update->uid = _uid; update->uid = _uid;
update->revision = _revision; update->revision = _revision;
...@@ -199,11 +194,11 @@ IPAACA_EXPORT void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new ...@@ -199,11 +194,11 @@ IPAACA_EXPORT void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new
update->writer_name = _buffer->unique_name(); update->writer_name = _buffer->unique_name();
update->new_links = new_links; update->new_links = new_links;
update->links_to_remove = links_to_remove; update->links_to_remove = links_to_remove;
boost::shared_ptr<int> result = server->call<int>("updateLinks", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO int result = server->request_remote_link_update(update); // TODO
if (*result == 0) { if (result == 0) {
throw IUUpdateFailedError(); throw IUUpdateFailedError();
} else { } else {
_revision = *result; _revision = result;
} }
} }
IPAACA_EXPORT void RemotePushIU::_modify_payload(bool is_delta, const std::map<std::string, PayloadDocumentEntry::ptr>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) IPAACA_EXPORT void RemotePushIU::_modify_payload(bool is_delta, const std::map<std::string, PayloadDocumentEntry::ptr>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
...@@ -216,7 +211,7 @@ IPAACA_EXPORT void RemotePushIU::_modify_payload(bool is_delta, const std::map<s ...@@ -216,7 +211,7 @@ IPAACA_EXPORT void RemotePushIU::_modify_payload(bool is_delta, const std::map<s
} else if (_read_only) { } else if (_read_only) {
throw IUReadOnlyError(); throw IUReadOnlyError();
} }
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name); auto server = ((InputBuffer*)_buffer)->_get_remote_server(_owner_name);
IUPayloadUpdate::ptr update = IUPayloadUpdate::ptr(new IUPayloadUpdate()); IUPayloadUpdate::ptr update = IUPayloadUpdate::ptr(new IUPayloadUpdate());
update->uid = _uid; update->uid = _uid;
update->revision = _revision; update->revision = _revision;
...@@ -225,11 +220,11 @@ IPAACA_EXPORT void RemotePushIU::_modify_payload(bool is_delta, const std::map<s ...@@ -225,11 +220,11 @@ IPAACA_EXPORT void RemotePushIU::_modify_payload(bool is_delta, const std::map<s
update->new_items = new_items; update->new_items = new_items;
update->keys_to_remove = keys_to_remove; update->keys_to_remove = keys_to_remove;
update->payload_type = _payload_type; update->payload_type = _payload_type;
boost::shared_ptr<int> result = server->call<int>("updatePayload", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO int result = server->request_remote_payload_update(update);
if (*result == 0) { if (result == 0) {
throw IUUpdateFailedError(); throw IUUpdateFailedError();
} else { } else {
_revision = *result; _revision = result;
} }
} }
...@@ -244,16 +239,16 @@ IPAACA_EXPORT void RemotePushIU::commit() ...@@ -244,16 +239,16 @@ IPAACA_EXPORT void RemotePushIU::commit()
// Following python version: ignoring multiple commit // Following python version: ignoring multiple commit
return; return;
} }
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name); auto server = ((InputBuffer*)_buffer)->_get_remote_server(_owner_name);
boost::shared_ptr<protobuf::IUCommission> update = boost::shared_ptr<protobuf::IUCommission>(new protobuf::IUCommission()); std::shared_ptr<protobuf::IUCommission> update = std::shared_ptr<protobuf::IUCommission>(new protobuf::IUCommission());
update->set_uid(_uid); update->set_uid(_uid);
update->set_revision(_revision); update->set_revision(_revision);
update->set_writer_name(_buffer->unique_name()); update->set_writer_name(_buffer->unique_name());
boost::shared_ptr<int> result = server->call<int>("commit", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO int result = server->request_remote_commission(update);
if (*result == 0) { if (result == 0) {
throw IUUpdateFailedError(); throw IUUpdateFailedError();
} else { } else {
_revision = *result; _revision = result;
} }
} }
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2015 Social Cognitive Systems Group * Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group) * (formerly the Sociable Agents Group)
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
...@@ -64,19 +64,6 @@ using namespace std; ...@@ -64,19 +64,6 @@ using namespace std;
int batch_update_main(int argc, char** argv)//{{{ int batch_update_main(int argc, char** argv)//{{{
{ {
std::string json_source("[\n\
\"old\",\n\
[\n\
\"str\",\n\
null\n\
],\n\
3,\n\
{\n\
\"key1\": \"value1\",\n\
\"key2\": \"value2\"\n\
}\n\
]");
ipaaca::OutputBuffer::ptr ob = ipaaca::OutputBuffer::create("myprog"); ipaaca::OutputBuffer::ptr ob = ipaaca::OutputBuffer::create("myprog");
std::cout << std::endl << "Setting up an IU with initial contents" << std::endl; std::cout << std::endl << "Setting up an IU with initial contents" << std::endl;
ipaaca::IU::ptr iu = ipaaca::IU::create("testcategory"); ipaaca::IU::ptr iu = ipaaca::IU::create("testcategory");
...@@ -90,6 +77,9 @@ int batch_update_main(int argc, char** argv)//{{{ ...@@ -90,6 +77,9 @@ int batch_update_main(int argc, char** argv)//{{{
std::cout << std::endl << "Publishing IU (sniffer should receive one ADDED)" << std::endl; std::cout << std::endl << "Publishing IU (sniffer should receive one ADDED)" << std::endl;
ob->add(iu); ob->add(iu);
std::cout << "Waiting 5 sec" << std::endl;
sleep(5);
std::cout << std::endl << "Batch-writing some stuff (sniffer should receive a single UPDATED)" << std::endl; std::cout << std::endl << "Batch-writing some stuff (sniffer should receive a single UPDATED)" << std::endl;
{ {
ipaaca::Locker locker(iu->payload()); ipaaca::Locker locker(iu->payload());
...@@ -108,11 +98,17 @@ int batch_update_main(int argc, char** argv)//{{{ ...@@ -108,11 +98,17 @@ int batch_update_main(int argc, char** argv)//{{{
iu->payload()["g"] = std::vector<std::string>{"g1", "g2"}; iu->payload()["g"] = std::vector<std::string>{"g1", "g2"};
iu->payload().remove("remove_me"); iu->payload().remove("remove_me");
iu->payload()["c"] = "abc123"; iu->payload()["c"] = "abc123";
iu->payload()["testlist"] = std::vector<long> {0, 1, 2, 3, 4, 5};
} }
std::cout << std::endl << "Adding another key 'XYZ' outside batch mode (sniffer -> UPDATED)" << std::endl; std::cout << std::endl << "Adding another key 'XYZ' and changing testlist to start with 2 1000s (sniffer -> 1x UPDATED)" << std::endl;
iu->payload()["XYZ"] = "blabla"; {
ipaaca::Locker locker(iu->payload());
iu->payload()["XYZ"] = "testlist should now start with two 1000s";
iu->payload()["testlist"][0] = 500;
iu->payload()["testlist"][0] = 1000;
iu->payload()["testlist"][1] = 1000;
}
std::cout << std::endl << "Final batch update, wiping most (sniffer should receive a third UPDATED, with 3 keys remaining in the payload)" << std::endl; std::cout << std::endl << "Final batch update, wiping most (sniffer should receive a third UPDATED, with 3 keys remaining in the payload)" << std::endl;
{ {
ipaaca::Locker locker(iu->payload()); ipaaca::Locker locker(iu->payload());
...@@ -126,6 +122,8 @@ int batch_update_main(int argc, char** argv)//{{{ ...@@ -126,6 +122,8 @@ int batch_update_main(int argc, char** argv)//{{{
std::cout << " " << it.first << " -> " << it.second << std::endl; std::cout << " " << it.first << " -> " << it.second << std::endl;
} }
std::cout << "Waiting 2 sec" << std::endl;
sleep(2);
return 0; return 0;
} }
//}}} //}}}
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2015 Social Cognitive Systems Group * Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group) * (formerly the Sociable Agents Group)
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
...@@ -35,11 +35,6 @@ ...@@ -35,11 +35,6 @@
namespace ipaaca { namespace ipaaca {
using namespace rsb;
using namespace rsb::filter;
using namespace rsb::converter;
using namespace rsb::patterns;
IPAACA_EXPORT std::ostream& operator<<(std::ostream& os, const SmartLinkMap& obj)//{{{ IPAACA_EXPORT std::ostream& operator<<(std::ostream& os, const SmartLinkMap& obj)//{{{
{ {
os << "{"; os << "{";
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2015 Social Cognitive Systems Group * Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group) * (formerly the Sociable Agents Group)
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2015 Social Cognitive Systems Group * Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group) * (formerly the Sociable Agents Group)
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
...@@ -33,15 +33,12 @@ ...@@ -33,15 +33,12 @@
#include <ipaaca/ipaaca.h> #include <ipaaca/ipaaca.h>
#include <sstream>
namespace ipaaca { namespace ipaaca {
using namespace rapidjson; using namespace rapidjson;
using namespace rsb;
using namespace rsb::filter;
using namespace rsb::converter;
using namespace rsb::patterns;
// temporary helper to show rapidjson internal type // temporary helper to show rapidjson internal type
std::string value_diagnosis(rapidjson::Value* val) std::string value_diagnosis(rapidjson::Value* val)
{ {
...@@ -153,7 +150,7 @@ IPAACA_EXPORT template<> bool json_value_cast(const rapidjson::Value& v) ...@@ -153,7 +150,7 @@ IPAACA_EXPORT template<> bool json_value_cast(const rapidjson::Value& v)
{ {
if (v.IsString()) { if (v.IsString()) {
std::string s = v.GetString(); std::string s = v.GetString();
return !((s=="")||(s=="false")||(s=="False")||(s=="0")); return !(s==""); // NEW: only empty string maps to false
} }
if (v.IsBool()) return v.GetBool(); if (v.IsBool()) return v.GetBool();
if (v.IsNull()) return false; if (v.IsNull()) return false;
...@@ -162,8 +159,10 @@ IPAACA_EXPORT template<> bool json_value_cast(const rapidjson::Value& v) ...@@ -162,8 +159,10 @@ IPAACA_EXPORT template<> bool json_value_cast(const rapidjson::Value& v)
if (v.IsInt64()) return v.GetInt64() != 0; if (v.IsInt64()) return v.GetInt64() != 0;
if (v.IsUint64()) return v.GetUint64() != 0; if (v.IsUint64()) return v.GetUint64() != 0;
if (v.IsDouble()) return v.GetDouble() != 0.0; if (v.IsDouble()) return v.GetDouble() != 0.0;
// default: assume "pointer-like" semantics (i.e. objects are TRUE) // NEW: empty structures map to false ('Pythonesque' semantics!)
return true; if (v.IsArray()) return v.Size() > 0;
if (v.IsObject()) return v.MemberCount() > 0;
throw NotImplementedError(); // should never be reached anyway
} }
//}}} //}}}
...@@ -277,7 +276,7 @@ IPAACA_EXPORT PayloadEntryProxy::PayloadEntryProxy(Payload* payload, const std:: ...@@ -277,7 +276,7 @@ IPAACA_EXPORT PayloadEntryProxy::PayloadEntryProxy(Payload* payload, const std::
: _payload(payload), _key(key), parent(nullptr) : _payload(payload), _key(key), parent(nullptr)
{ {
document_entry = _payload->get_entry(key); document_entry = _payload->get_entry(key);
json_value = &(document_entry->document); set_json_value(&(document_entry->document), "construction");
} }
IPAACA_EXPORT PayloadEntryProxy::PayloadEntryProxy(PayloadEntryProxy* parent_, const std::string& addr_key_) IPAACA_EXPORT PayloadEntryProxy::PayloadEntryProxy(PayloadEntryProxy* parent_, const std::string& addr_key_)
: parent(parent_), addressed_key(addr_key_), addressed_as_array(false) : parent(parent_), addressed_key(addr_key_), addressed_as_array(false)
...@@ -287,10 +286,10 @@ IPAACA_EXPORT PayloadEntryProxy::PayloadEntryProxy(PayloadEntryProxy* parent_, c ...@@ -287,10 +286,10 @@ IPAACA_EXPORT PayloadEntryProxy::PayloadEntryProxy(PayloadEntryProxy* parent_, c
document_entry = parent->document_entry; document_entry = parent->document_entry;
auto it = parent->json_value->FindMember(addr_key_.c_str()); auto it = parent->json_value->FindMember(addr_key_.c_str());
if (it != parent->json_value->MemberEnd()) { if (it != parent->json_value->MemberEnd()) {
json_value = &(parent->json_value->operator[](addr_key_.c_str())); set_json_value(&(parent->json_value->operator[](addr_key_.c_str())), std::string("construction from str addressing with ")+addr_key_);
existent = true; existent = true;
} else { } else {
json_value = nullptr; // avoid heap construction here set_json_value(nullptr, std::string("null-construction from failed str addressing with ")+addr_key_); // avoid heap construction here
existent = false; existent = false;
} }
} }
...@@ -300,7 +299,7 @@ IPAACA_EXPORT PayloadEntryProxy::PayloadEntryProxy(PayloadEntryProxy* parent_, s ...@@ -300,7 +299,7 @@ IPAACA_EXPORT PayloadEntryProxy::PayloadEntryProxy(PayloadEntryProxy* parent_, s
_payload = parent->_payload; _payload = parent->_payload;
_key = parent->_key; _key = parent->_key;
document_entry = parent->document_entry; document_entry = parent->document_entry;
json_value = &(parent->json_value->operator[](addr_idx_)); set_json_value(&(parent->json_value->operator[](addr_idx_)), std::string("construction from int addressing with ")+std::to_string(addr_idx_));
existent = true; existent = true;
} }
...@@ -314,8 +313,10 @@ IPAACA_EXPORT PayloadEntryProxy PayloadEntryProxy::operator[](const std::string& ...@@ -314,8 +313,10 @@ IPAACA_EXPORT PayloadEntryProxy PayloadEntryProxy::operator[](const std::string&
IPAACA_INFO("Invalid json_value") IPAACA_INFO("Invalid json_value")
throw PayloadAddressingError(); throw PayloadAddressingError();
} }
//IPAACA_DEBUG("string addressing with '" << addr_key_ << "' of json object " << (off_t) json_value )
if (! json_value->IsObject()) { if (! json_value->IsObject()) {
IPAACA_INFO("Expected Object for operator[](string)") IPAACA_INFO("Expected Object for operator[](string)")
//IPAACA_DEBUG(" But type is: " << value_diagnosis(json_value) )
throw PayloadAddressingError(); throw PayloadAddressingError();
} }
return PayloadEntryProxy(this, addr_key_); return PayloadEntryProxy(this, addr_key_);
...@@ -362,6 +363,10 @@ IPAACA_EXPORT PayloadEntryProxy::operator std::string() ...@@ -362,6 +363,10 @@ IPAACA_EXPORT PayloadEntryProxy::operator std::string()
{ {
return json_value_cast<std::string>(json_value); return json_value_cast<std::string>(json_value);
} }
IPAACA_EXPORT PayloadEntryProxy::operator int()
{
return json_value_cast<int>(json_value);
}
IPAACA_EXPORT PayloadEntryProxy::operator long() IPAACA_EXPORT PayloadEntryProxy::operator long()
{ {
return json_value_cast<long>(json_value); return json_value_cast<long>(json_value);
...@@ -378,6 +383,10 @@ IPAACA_EXPORT std::string PayloadEntryProxy::to_str() ...@@ -378,6 +383,10 @@ IPAACA_EXPORT std::string PayloadEntryProxy::to_str()
{ {
return json_value_cast<std::string>(json_value); return json_value_cast<std::string>(json_value);
} }
IPAACA_EXPORT int PayloadEntryProxy::to_int()
{
return json_value_cast<int>(json_value);
}
IPAACA_EXPORT long PayloadEntryProxy::to_long() IPAACA_EXPORT long PayloadEntryProxy::to_long()
{ {
return json_value_cast<long>(json_value); return json_value_cast<long>(json_value);
...@@ -386,6 +395,10 @@ IPAACA_EXPORT double PayloadEntryProxy::to_float() ...@@ -386,6 +395,10 @@ IPAACA_EXPORT double PayloadEntryProxy::to_float()
{ {
return json_value_cast<double>(json_value); return json_value_cast<double>(json_value);
} }
IPAACA_EXPORT double PayloadEntryProxy::to_double()
{
return json_value_cast<double>(json_value);
}
IPAACA_EXPORT bool PayloadEntryProxy::to_bool() IPAACA_EXPORT bool PayloadEntryProxy::to_bool()
{ {
return json_value_cast<bool>(json_value); return json_value_cast<bool>(json_value);
...@@ -448,6 +461,9 @@ IPAACA_EXPORT void Payload::on_lock() ...@@ -448,6 +461,9 @@ IPAACA_EXPORT void Payload::on_lock()
Locker locker(_payload_operation_mode_lock); Locker locker(_payload_operation_mode_lock);
IPAACA_DEBUG("Starting payload batch update mode ...") IPAACA_DEBUG("Starting payload batch update mode ...")
_update_on_every_change = false; _update_on_every_change = false;
std::stringstream ss;
ss << std::this_thread::get_id();
_writing_thread_id = ss.str();
} }
IPAACA_EXPORT void Payload::on_unlock() IPAACA_EXPORT void Payload::on_unlock()
{ {
...@@ -459,11 +475,12 @@ IPAACA_EXPORT void Payload::on_unlock() ...@@ -459,11 +475,12 @@ IPAACA_EXPORT void Payload::on_unlock()
_collected_modifications.clear(); _collected_modifications.clear();
_collected_removals.clear(); _collected_removals.clear();
IPAACA_DEBUG("... exiting payload batch update mode.") IPAACA_DEBUG("... exiting payload batch update mode.")
_writing_thread_id = "";
} }
IPAACA_EXPORT void Payload::initialize(boost::shared_ptr<IUInterface> iu) IPAACA_EXPORT void Payload::initialize(std::shared_ptr<IUInterface> iu)
{ {
_iu = boost::weak_ptr<IUInterface>(iu); _iu = std::weak_ptr<IUInterface>(iu);
} }
IPAACA_EXPORT PayloadEntryProxy Payload::operator[](const std::string& key) IPAACA_EXPORT PayloadEntryProxy Payload::operator[](const std::string& key)
...@@ -495,6 +512,7 @@ IPAACA_EXPORT void Payload::_internal_set(const std::string& k, PayloadDocumentE ...@@ -495,6 +512,7 @@ IPAACA_EXPORT void Payload::_internal_set(const std::string& k, PayloadDocumentE
_batch_update_writer_name = writer_name; _batch_update_writer_name = writer_name;
_collected_modifications[k] = v; _collected_modifications[k] = v;
// revoke deletions of this updated key // revoke deletions of this updated key
//_collected_removals.erase(k);
std::vector<std::string> new_removals; std::vector<std::string> new_removals;
for (auto& rk: _collected_removals) { for (auto& rk: _collected_removals) {
if (rk!=k) new_removals.push_back(rk); if (rk!=k) new_removals.push_back(rk);
...@@ -515,6 +533,7 @@ IPAACA_EXPORT void Payload::_internal_remove(const std::string& k, const std::st ...@@ -515,6 +533,7 @@ IPAACA_EXPORT void Payload::_internal_remove(const std::string& k, const std::st
IPAACA_DEBUG("queueing a payload remove operation") IPAACA_DEBUG("queueing a payload remove operation")
_batch_update_writer_name = writer_name; _batch_update_writer_name = writer_name;
_collected_removals.push_back(k); _collected_removals.push_back(k);
//_collected_removals.insert(k);
// revoke updates of this deleted key // revoke updates of this deleted key
_collected_modifications.erase(k); _collected_modifications.erase(k);
} }
...@@ -538,6 +557,7 @@ IPAACA_EXPORT void Payload::_internal_replace_all(const std::map<std::string, Pa ...@@ -538,6 +557,7 @@ IPAACA_EXPORT void Payload::_internal_replace_all(const std::map<std::string, Pa
for (auto& kv: _document_store) { for (auto& kv: _document_store) {
if (! new_contents.count(kv.first)) { if (! new_contents.count(kv.first)) {
_collected_removals.push_back(kv.first); _collected_removals.push_back(kv.first);
//_collected_removals.insert(kv.first);
_collected_modifications.erase(kv.first); _collected_modifications.erase(kv.first);
} }
} }
...@@ -559,6 +579,7 @@ IPAACA_EXPORT void Payload::_internal_merge(const std::map<std::string, PayloadD ...@@ -559,6 +579,7 @@ IPAACA_EXPORT void Payload::_internal_merge(const std::map<std::string, PayloadD
_batch_update_writer_name = writer_name; _batch_update_writer_name = writer_name;
for (auto& kv: contents_to_merge) { for (auto& kv: contents_to_merge) {
_collected_modifications[kv.first] = kv.second; _collected_modifications[kv.first] = kv.second;
//_collected_removals.erase(kv.first); // moved here
updated_keys.insert(kv.first); updated_keys.insert(kv.first);
} }
// revoke deletions of updated keys // revoke deletions of updated keys
...@@ -582,6 +603,29 @@ IPAACA_EXPORT void Payload::_internal_merge_and_remove(const std::map<std::strin ...@@ -582,6 +603,29 @@ IPAACA_EXPORT void Payload::_internal_merge_and_remove(const std::map<std::strin
mark_revision_change(); mark_revision_change();
} }
IPAACA_EXPORT PayloadDocumentEntry::ptr Payload::get_entry(const std::string& k) { IPAACA_EXPORT PayloadDocumentEntry::ptr Payload::get_entry(const std::string& k) {
if (! _update_on_every_change) {
std::stringstream ss;
ss << std::this_thread::get_id();
if (_writing_thread_id == ss.str()) {
IPAACA_DEBUG("Payload locked by current thread, looking for payload key in caches first")
// in batch mode, read from cached writed first!
// case 1: deleted key
if (std::find(_collected_removals.begin(), _collected_removals.end(), k) != _collected_removals.end()) {
IPAACA_DEBUG("Key removed, returning null")
for (auto& cr : _collected_removals) {
IPAACA_DEBUG(" cached removal: " << cr)
}
return PayloadDocumentEntry::create_null();
}
// case 2: updated key - use last known state!
auto it = _collected_modifications.find(k);
if (it!=_collected_modifications.end()) {
IPAACA_DEBUG("Key updated, returning current version")
return it->second;
}
// case 3: key not in the caches yet, just continue below
}
}
if (_document_store.count(k)>0) return _document_store[k]; if (_document_store.count(k)>0) return _document_store[k];
else return PayloadDocumentEntry::create_null(); // contains Document with 'null' value else return PayloadDocumentEntry::create_null(); // contains Document with 'null' value
} }
...@@ -675,6 +719,7 @@ IPAACA_EXPORT PayloadEntryProxyMapIterator::PayloadEntryProxyMapIterator(Payload ...@@ -675,6 +719,7 @@ IPAACA_EXPORT PayloadEntryProxyMapIterator::PayloadEntryProxyMapIterator(Payload
IPAACA_EXPORT PayloadEntryProxyMapIterator& PayloadEntryProxyMapIterator::operator++() IPAACA_EXPORT PayloadEntryProxyMapIterator& PayloadEntryProxyMapIterator::operator++()
{ {
//IPAACA_DEBUG("Map iterator incrementing");
raw_iterator++; raw_iterator++;
return *this; return *this;
} }
...@@ -682,6 +727,8 @@ IPAACA_EXPORT PayloadEntryProxyMapIterator& PayloadEntryProxyMapIterator::operat ...@@ -682,6 +727,8 @@ IPAACA_EXPORT PayloadEntryProxyMapIterator& PayloadEntryProxyMapIterator::operat
IPAACA_EXPORT std::pair<std::string, PayloadEntryProxy> PayloadEntryProxyMapIterator::operator*() IPAACA_EXPORT std::pair<std::string, PayloadEntryProxy> PayloadEntryProxyMapIterator::operator*()
{ {
std::string key = raw_iterator->name.GetString(); std::string key = raw_iterator->name.GetString();
//IPAACA_DEBUG("Deref map iterator key '" << key << "'");
//IPAACA_DEBUG(" of proxy " << (off_t) proxy << "");
return std::pair<std::string, PayloadEntryProxy>(key, (*proxy)[key] ); // generates child Proxy return std::pair<std::string, PayloadEntryProxy>(key, (*proxy)[key] ); // generates child Proxy
} }
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2015 Social Cognitive Systems Group * Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group) * (formerly the Sociable Agents Group)
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
...@@ -87,28 +87,34 @@ std::string str_join(const std::vector<std::string>& vec,const std::string& sep) ...@@ -87,28 +87,34 @@ std::string str_join(const std::vector<std::string>& vec,const std::string& sep)
return tmp; return tmp;
} }
void str_split_wipe(const std::string& str, std::vector<std::string>& tokens, const std::string& delimiters ) int str_split_wipe(const std::string& str, std::vector<std::string>& tokens, const std::string& delimiters )
{ {
tokens.clear(); tokens.clear();
std::string::size_type lastPos = str.find_first_not_of(delimiters, 0); std::string::size_type lastPos = str.find_first_not_of(delimiters, 0);
std::string::size_type pos = str.find_first_of(delimiters, lastPos); std::string::size_type pos = str.find_first_of(delimiters, lastPos);
int count = 0;
while (std::string::npos != pos || std::string::npos != lastPos) while (std::string::npos != pos || std::string::npos != lastPos)
{ {
count++;
tokens.push_back(str.substr(lastPos, pos - lastPos)); tokens.push_back(str.substr(lastPos, pos - lastPos));
lastPos = str.find_first_not_of(delimiters, pos); lastPos = str.find_first_not_of(delimiters, pos);
pos = str.find_first_of(delimiters, lastPos); pos = str.find_first_of(delimiters, lastPos);
} }
return count;
} }
void str_split_append(const std::string& str, std::vector<std::string>& tokens, const std::string& delimiters ) int str_split_append(const std::string& str, std::vector<std::string>& tokens, const std::string& delimiters )
{ {
std::string::size_type lastPos = str.find_first_not_of(delimiters, 0); std::string::size_type lastPos = str.find_first_not_of(delimiters, 0);
std::string::size_type pos = str.find_first_of(delimiters, lastPos); std::string::size_type pos = str.find_first_of(delimiters, lastPos);
int count = 0;
while (std::string::npos != pos || std::string::npos != lastPos) while (std::string::npos != pos || std::string::npos != lastPos)
{ {
count++;
tokens.push_back(str.substr(lastPos, pos - lastPos)); tokens.push_back(str.substr(lastPos, pos - lastPos));
lastPos = str.find_first_not_of(delimiters, pos); lastPos = str.find_first_not_of(delimiters, pos);
pos = str.find_first_of(delimiters, lastPos); pos = str.find_first_of(delimiters, lastPos);
} }
return count;
} }
} // namespace ipaaca } // namespace ipaaca
......
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
/**
* \file ipaaca-tester.cc
*
* \brief Multifunction tester component, C++ version
*
* This file is not used in the ipaaca library, but produces
* a separate program, if enabled in CMakeLists.txt
*
* \author Ramin Yaghoubzadeh (ryaghoubzadeh@uni-bielefeld.de)
* \date January, 2017
*/
#include <ipaaca/ipaaca.h>
#include <cstdio>
#include <iomanip>
#include <thread>
#include <chrono>
#if _WIN32 || _WIN64
double get_time_as_secs() { return 0.0; } // TODO implement time function for Windows when required
#else
#include <sys/time.h>
double get_time_as_secs() {
struct timeval tv;
if (gettimeofday(&tv, NULL)) return 0.0;
return (0.001 * ((double)tv.tv_sec * 1000000.0 + tv.tv_usec));
}
#endif
class TesterCpp {
public:
void handle_iu_inbuf(std::shared_ptr<ipaaca::IUInterface> iu, ipaaca::IUEventType etype, bool local)
{
std::cout << std::fixed << std::setprecision(3) << get_time_as_secs() << " ";
std::cout << ipaaca::iu_event_type_to_str(etype) << " category=" << iu->category() << " uid=" << iu->uid() << std::endl;
//
auto links = iu->get_all_links();
if (links.size()>0) {
std::cout << "links={" << std::endl;
for (auto kv : links) {
std::cout << "\t" << kv.first << ": [";
bool first = true;
for (const auto& lnk : kv.second) {
if (first) { first=false; } else { std::cout << ", "; }
std::cout << lnk;
}
std::cout << "]";
}
std::cout << "}" << std::endl;
}
//
std::cout << "payload={" << std::endl;
for (auto kv : iu->payload()) {
std::cout << "\t'" << kv.first << "': " << ((std::string) kv.second) << ',' << std::endl;
}
std::cout << "}" << std::endl;
if (etype == IU_ADDED) {
std::cout << "Will send a modification to a received new IU" << std::endl;
int cnt=5;
while(cnt>0) {
try {
iu->payload()["seen_by_cpp"] = true;
cnt = 0;
} catch(...) {
std::cout << "... failed, but we try more than once ..." << std::endl;
cnt--;
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
}
}
}
int run()
{
ipaaca::OutputBuffer::ptr ob = ipaaca::OutputBuffer::create("testerCpp");
ipaaca::InputBuffer::ptr ib = ipaaca::InputBuffer::create("testerCpp", std::set<std::string>{"testcategory"}); // MQTT requires # as a wildcard!
ib->set_resend(true);
ib->register_handler(IPAACA_BIND_CLASS_HANDLER(&TesterCpp::handle_iu_inbuf, this));
std::cout << "Listening for all IU events and sending a Message after 1 sec ..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
auto msg = ipaaca::IU::create("testcategory");
msg->payload()["hello"] = "world";
ob->add(msg);
int i = 0;
while(true) {
std::this_thread::sleep_for(std::chrono::seconds(5));
auto msg = ipaaca::Message::create("testcate2");
msg->payload()["num"] = ++i;
ob->add(msg);
}
return 0;
}
};
int main(int argc, char** argv)
{
/*
auto config = ipaaca::get_global_config(true);
for (auto it = config->begin(); it!=config->end(); ++it ) {
std::cout << it->first << "=" << it->second << std::endl;
}
exit(1);
*/
ipaaca::__ipaaca_static_option_log_level = IPAACA_LOG_LEVEL_DEBUG;
TesterCpp tester;
tester.run();
}
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2015 Social Cognitive Systems Group * Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group) * (formerly the Sociable Agents Group)
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
...@@ -38,7 +38,7 @@ namespace ipaaca { ...@@ -38,7 +38,7 @@ namespace ipaaca {
// UUID generation // UUID generation
IPAACA_EXPORT std::string generate_uuid_string()//{{{ IPAACA_EXPORT std::string generate_uuid_string()//{{{
{ {
#ifdef WIN32 #if _WIN32 || _WIN64
// Windows // Windows
UUID uuid; UUID uuid;
RPC_STATUS stat; RPC_STATUS stat;
...@@ -51,6 +51,7 @@ IPAACA_EXPORT std::string generate_uuid_string()//{{{ ...@@ -51,6 +51,7 @@ IPAACA_EXPORT std::string generate_uuid_string()//{{{
RpcStringFree(&uuid_str); RpcStringFree(&uuid_str);
return result; return result;
} }
throw UUIDGenerationError();
} else { } else {
throw UUIDGenerationError(); throw UUIDGenerationError();
} }
...@@ -76,6 +77,11 @@ IPAACA_EXPORT std::string __ipaaca_static_option_default_payload_type("JSON"); ...@@ -76,6 +77,11 @@ IPAACA_EXPORT std::string __ipaaca_static_option_default_payload_type("JSON");
IPAACA_EXPORT std::string __ipaaca_static_option_default_channel("default"); IPAACA_EXPORT std::string __ipaaca_static_option_default_channel("default");
IPAACA_EXPORT unsigned int __ipaaca_static_option_log_level(IPAACA_LOG_LEVEL_WARNING); IPAACA_EXPORT unsigned int __ipaaca_static_option_log_level(IPAACA_LOG_LEVEL_WARNING);
IPAACA_EXPORT std::string __ipaaca_static_option_rsb_host("");
IPAACA_EXPORT std::string __ipaaca_static_option_rsb_port("");
IPAACA_EXPORT std::string __ipaaca_static_option_rsb_transport("");
IPAACA_EXPORT std::string __ipaaca_static_option_rsb_socketserver("");
} // of namespace ipaaca } // of namespace ipaaca
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2015 Social Cognitive Systems Group * Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group) * (formerly the Sociable Agents Group)
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
...@@ -122,7 +122,7 @@ void ComponentNotifier::initialize() { ...@@ -122,7 +122,7 @@ void ComponentNotifier::initialize() {
Locker locker(lock); Locker locker(lock);
if (!initialized) { if (!initialized) {
initialized = true; initialized = true;
in_buf->register_handler(boost::bind(&ComponentNotifier::handle_iu_event, this, _1, _2, _3)); in_buf->register_handler(std::bind(&ComponentNotifier::handle_iu_event, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
submit_notify(_IPAACA_COMP_NOTIF_STATE_NEW); submit_notify(_IPAACA_COMP_NOTIF_STATE_NEW);
} }
} }
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2013 Sociable Agents Group * Copyright (c) 2009-2022 Sociable Agents Group
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
* http://opensource.cit-ec.de/projects/ipaaca/ * http://opensource.cit-ec.de/projects/ipaaca/
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2013 Sociable Agents Group * Copyright (c) 2009-2022 Sociable Agents Group
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
* http://opensource.cit-ec.de/projects/ipaaca/ * http://opensource.cit-ec.de/projects/ipaaca/
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* "Incremental Processing Architecture * "Incremental Processing Architecture
* for Artificial Conversational Agents". * for Artificial Conversational Agents".
* *
* Copyright (c) 2009-2013 Sociable Agents Group * Copyright (c) 2009-2022 Sociable Agents Group
* CITEC, Bielefeld University * CITEC, Bielefeld University
* *
* http://opensource.cit-ec.de/projects/ipaaca/ * http://opensource.cit-ec.de/projects/ipaaca/
......
...@@ -6,3 +6,4 @@ dist ...@@ -6,3 +6,4 @@ dist
privateprops privateprops
.project .project
.classpath .classpath
/bin/
...@@ -6,5 +6,6 @@ run.jvmargs= -Xms128m -Xmx512m -Xss5M ...@@ -6,5 +6,6 @@ run.jvmargs= -Xms128m -Xmx512m -Xss5M
rebuild.list= rebuild.list=
publish.resolver=asap.sftp.publish publish.resolver=asap.sftp.publish
dist.dir=../../dist dist.dir=../../dist
#javac.source=1.6 javac.source=1.8
#javac.target=1.6 javac.target=1.8