Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • android_compatible_0.14
  • ipaaca-json
  • ipaaca-rsb
  • ipaaca3-dev
  • ipaaca4
  • ipaaca4py3
  • kompass
  • legacy-str
  • master
  • mqtt_port
  • rsb0.11
  • rsb0.14
  • ryt-fullport
  • softwareweek2019
  • windows-compatibility
15 results

Target

Select target project
  • scs/ipaaca
  • ramin.yaghoubzadeh/ipaaca
2 results
Select Git revision
  • ipaaca-rsb
  • ipaaca3-dev
  • ipaaca4
  • kompass
  • master
  • rsb0.14
6 results
Show changes
Showing
with 451 additions and 758 deletions
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
#include <ipaaca/ipaaca.h>
namespace ipaaca {
// static library Initializer
IPAACA_EXPORT bool Initializer::_initialized = false;
IPAACA_EXPORT bool Initializer::initialized() { return _initialized; }
IPAACA_EXPORT void Initializer::initialize_ipaaca_rsb_if_needed()
{
initialize_backend();
}
IPAACA_EXPORT void Initializer::initialize_backend()//{{{
{
if (_initialized) return;
override_env_with_cmdline_set_variables();
_initialized = true;
}//}}}
IPAACA_EXPORT void Initializer::dump_current_default_config()//{{{
{
IPAACA_INFO("--- Dumping current global configuration ---")
auto cfg = ipaaca::get_global_config();
for (auto it = cfg->data_cbegin(); it != cfg->data_cend(); ++it) {
IPAACA_INFO("--- " << it->first << " = \"" << it->second << "\"")
}
IPAACA_INFO("-------------- End of config ---------------")
}//}}}
IPAACA_EXPORT void Initializer::override_env_with_cmdline_set_variables()//{{{
{
// set RSB host and port iff provided using cmdline arguments
if (__ipaaca_static_option_rsb_host!="") {
IPAACA_INFO("Overriding RSB host with " << __ipaaca_static_option_rsb_host)
IPAACA_SETENV("RSB_TRANSPORT_SPREAD_HOST", __ipaaca_static_option_rsb_host.c_str())
IPAACA_SETENV("RSB_TRANSPORT_SOCKET_HOST", __ipaaca_static_option_rsb_host.c_str());
}
if (__ipaaca_static_option_rsb_port!="") {
IPAACA_INFO("Overriding RSB port with " << __ipaaca_static_option_rsb_port)
IPAACA_SETENV("RSB_TRANSPORT_SPREAD_PORT", __ipaaca_static_option_rsb_port.c_str());
IPAACA_SETENV("RSB_TRANSPORT_SOCKET_PORT", __ipaaca_static_option_rsb_port.c_str());
}
if (__ipaaca_static_option_rsb_transport!="") {
if (__ipaaca_static_option_rsb_transport == "spread") {
IPAACA_INFO("Overriding RSB transport mode - using 'spread' ")
IPAACA_SETENV("RSB_TRANSPORT_SPREAD_ENABLED", "1");
IPAACA_SETENV("RSB_TRANSPORT_SOCKET_ENABLED", "0");
} else if (__ipaaca_static_option_rsb_transport == "socket") {
IPAACA_INFO("Overriding RSB transport mode - using 'socket' ")
IPAACA_SETENV("RSB_TRANSPORT_SPREAD_ENABLED", "0");
IPAACA_SETENV("RSB_TRANSPORT_SOCKET_ENABLED", "1");
if (__ipaaca_static_option_rsb_socketserver!="") {
const std::string& srv = __ipaaca_static_option_rsb_socketserver;
if ((srv=="1")||(srv=="0")||(srv=="auto")) {
IPAACA_INFO("Overriding RSB transport.socket.server with " << srv)
IPAACA_SETENV("RSB_TRANSPORT_SOCKET_SERVER", srv.c_str());
} else {
IPAACA_INFO("Unknown RSB transport.socket.server mode " << srv << " - using config default ")
}
}
} else {
IPAACA_INFO("Unknown RSB transport mode " << __ipaaca_static_option_rsb_transport << " - using config default ")
}
}
}//}}}
} // of namespace ipaaca
/*
* This file is part of IPAACA, the
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
* http://purl.org/net/ipaaca
*
* This file may be licensed under the terms of of the
* GNU Lesser General Public License Version 3 (the ``LGPL''),
* or (at your option) any later version.
*
* Software distributed under the License is distributed
* on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
* express or implied. See the LGPL for the specific language
* governing rights and limitations.
*
* You should have received a copy of the LGPL along with this
* program. If not, go to http://www.gnu.org/licenses/lgpl.html
* or write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* The development of this software was supported by the
* Excellence Cluster EXC 277 Cognitive Interaction Technology.
* The Excellence Cluster EXC 277 is a grant of the Deutsche
* Forschungsgemeinschaft (DFG) in the context of the German
* Excellence Initiative.
*/
#include <ipaaca/ipaaca.h>
namespace ipaaca {
using namespace rsb;
using namespace rsb::filter;
using namespace rsb::converter;
using namespace rsb::patterns;
// static library Initializer
IPAACA_EXPORT bool Initializer::_initialized = false;
IPAACA_EXPORT bool Initializer::initialized() { return _initialized; }
IPAACA_EXPORT void Initializer::initialize_ipaaca_rsb_if_needed()
{
initialize_backend();
}
IPAACA_EXPORT void Initializer::initialize_backend()//{{{
{
if (_initialized) return;
auto_configure_rsb();
IPAACA_DEBUG("Creating and registering Converters")
boost::shared_ptr<IUConverter> iu_converter(new IUConverter());
converterRepository<std::string>()->registerConverter(iu_converter);
boost::shared_ptr<MessageConverter> message_converter(new MessageConverter());
converterRepository<std::string>()->registerConverter(message_converter);
boost::shared_ptr<IUPayloadUpdateConverter> payload_update_converter(new IUPayloadUpdateConverter());
converterRepository<std::string>()->registerConverter(payload_update_converter);
boost::shared_ptr<IULinkUpdateConverter> link_update_converter(new IULinkUpdateConverter());
converterRepository<std::string>()->registerConverter(link_update_converter);
boost::shared_ptr<ProtocolBufferConverter<protobuf::IUCommission> > iu_commission_converter(new ProtocolBufferConverter<protobuf::IUCommission> ());
converterRepository<std::string>()->registerConverter(iu_commission_converter);
// dlw
boost::shared_ptr<ProtocolBufferConverter<protobuf::IUResendRequest> > iu_resendrequest_converter(new ProtocolBufferConverter<protobuf::IUResendRequest> ());
converterRepository<std::string>()->registerConverter(iu_resendrequest_converter);
boost::shared_ptr<ProtocolBufferConverter<protobuf::IURetraction> > iu_retraction_converter(new ProtocolBufferConverter<protobuf::IURetraction> ());
converterRepository<std::string>()->registerConverter(iu_retraction_converter);
// boost::shared_ptr<IntConverter> int_converter(new IntConverter());
// converterRepository<std::string>()->registerConverter(int_converter);
IPAACA_DEBUG("Backend / converter initialization complete.")
_initialized = true;
}//}}}
IPAACA_EXPORT void Initializer::dump_current_default_config()//{{{
{
IPAACA_INFO("--- Dumping current default participant configuration ---")
rsb::ParticipantConfig config = getFactory().getDefaultParticipantConfig();
std::set<rsb::ParticipantConfig::Transport> transports = config.getTransports();
for (std::set<rsb::ParticipantConfig::Transport>::const_iterator it=transports.begin(); it!=transports.end(); ++it) {
IPAACA_INFO( "Active transport: " << it->getName() )
}
IPAACA_INFO("--- End of configuration dump ---")
//ParticipantConfig::Transport inprocess = config.getTransport("inprocess");
//inprocess.setEnabled(true);
//config.addTransport(inprocess);
}//}}}
IPAACA_EXPORT void Initializer::auto_configure_rsb()//{{{
{
// set RSB host and port iff provided using cmdline arguments
if (__ipaaca_static_option_rsb_host!="") {
IPAACA_INFO("Overriding RSB host with " << __ipaaca_static_option_rsb_host)
IPAACA_SETENV("RSB_TRANSPORT_SPREAD_HOST", __ipaaca_static_option_rsb_host.c_str())
IPAACA_SETENV("RSB_TRANSPORT_SOCKET_HOST", __ipaaca_static_option_rsb_host.c_str());
}
if (__ipaaca_static_option_rsb_port!="") {
IPAACA_INFO("Overriding RSB port with " << __ipaaca_static_option_rsb_port)
IPAACA_SETENV("RSB_TRANSPORT_SPREAD_PORT", __ipaaca_static_option_rsb_port.c_str());
IPAACA_SETENV("RSB_TRANSPORT_SOCKET_PORT", __ipaaca_static_option_rsb_port.c_str());
}
if (__ipaaca_static_option_rsb_transport!="") {
if (__ipaaca_static_option_rsb_transport == "spread") {
IPAACA_INFO("Overriding RSB transport mode - using 'spread' ")
IPAACA_SETENV("RSB_TRANSPORT_SPREAD_ENABLED", "1");
IPAACA_SETENV("RSB_TRANSPORT_SOCKET_ENABLED", "0");
} else if (__ipaaca_static_option_rsb_transport == "socket") {
IPAACA_INFO("Overriding RSB transport mode - using 'socket' ")
IPAACA_SETENV("RSB_TRANSPORT_SPREAD_ENABLED", "0");
IPAACA_SETENV("RSB_TRANSPORT_SOCKET_ENABLED", "1");
if (__ipaaca_static_option_rsb_socketserver!="") {
const std::string& srv = __ipaaca_static_option_rsb_socketserver;
if ((srv=="1")||(srv=="0")||(srv=="auto")) {
IPAACA_INFO("Overriding RSB transport.socket.server with " << srv)
IPAACA_SETENV("RSB_TRANSPORT_SOCKET_SERVER", srv.c_str());
} else {
IPAACA_INFO("Unknown RSB transport.socket.server mode " << srv << " - using config default ")
}
}
} else {
IPAACA_INFO("Unknown RSB transport mode " << __ipaaca_static_option_rsb_transport << " - using config default ")
}
}
const char* plugin_path = getenv("RSB_PLUGINS_CPP_PATH");
if (!plugin_path) {
#if _WIN32 || _WIN64
const char* soa_repo_dir = getenv("SOA_REPO_DIR");
if (!soa_repo_dir) {
IPAACA_WARNING("WARNING: RSB_PLUGINS_CPP_PATH not set (and SOA_REPO_DIR not set) - in Windows, no further plugin search is made!")
} else {
std::stringstream path_s;
for (const char c: std::string(soa_repo_dir)) {
path_s << c;
if (c=='\\') path_s << c; // reduplicate backslashes for the RSB env var
}
#if _WIN64
path_s << "\\\\rsb-win-scs_64bit\\\\bin";
#else
path_s << "\\\\rsb-win-scs_32bit\\\\bin";
#endif
IPAACA_WARNING("Caution: RSB_PLUGINS_CPP_PATH not set, inferred from SOA_REPO_DIR: " << path_s.str())
IPAACA_SETENV("RSB_PLUGINS_CPP_PATH", path_s.str().c_str());
}
//throw NotImplementedError();
#else
IPAACA_INFO("RSB_PLUGINS_CPP_PATH not set; looking here and up to 7 dirs up.")
std::string pathstr = "./";
for (int i=0; i< 8 /* depth EIGHT (totally arbitrary..) */ ; i++) {
std::string where_str = pathstr+"deps/lib/rsb*/plugins";
const char* where = where_str.c_str();
glob_t g;
glob(where, 0, NULL, &g);
if (g.gl_pathc>0) {
const char* found_path = g.gl_pathv[0];
IPAACA_INFO("Found an RSB plugin dir which will be used automatically: " << found_path)
IPAACA_SETENV("RSB_PLUGINS_CPP_PATH", found_path);
break;
} // else keep going
globfree(&g);
pathstr += "../";
}
#endif
} else {
IPAACA_INFO("RSB_PLUGINS_CPP_PATH already defined: " << plugin_path)
}
}//}}}
// RSB backend Converters
// IUConverter//{{{
IPAACA_EXPORT IUConverter::IUConverter()
: Converter<std::string> (IPAACA_SYSTEM_DEPENDENT_CLASS_NAME("ipaaca::IU"), "ipaaca-iu", true)
{
}
IPAACA_EXPORT std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire)
{
assert(data.first == getDataType()); // "ipaaca::IU"
boost::shared_ptr<const IU> obj = boost::static_pointer_cast<const IU> (data.second);
boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
// transfer obj data to pbo
pbo->set_uid(obj->uid());
pbo->set_revision(obj->revision());
pbo->set_category(obj->category());
pbo->set_payload_type(obj->payload_type());
pbo->set_owner_name(obj->owner_name());
pbo->set_committed(obj->committed());
ipaaca::protobuf::IU_AccessMode a_m;
switch(obj->access_mode()) {
case IU_ACCESS_PUSH:
a_m = ipaaca::protobuf::IU_AccessMode_PUSH;
break;
case IU_ACCESS_REMOTE:
a_m = ipaaca::protobuf::IU_AccessMode_REMOTE;
break;
case IU_ACCESS_MESSAGE:
a_m = ipaaca::protobuf::IU_AccessMode_MESSAGE;
break;
}
pbo->set_access_mode(a_m);
pbo->set_read_only(obj->read_only());
for (auto& kv: obj->_payload._document_store) {
protobuf::PayloadItem* item = pbo->add_payload();
item->set_key(kv.first);
IPAACA_DEBUG("Payload type: " << obj->_payload_type)
if (obj->_payload_type=="JSON") {
item->set_value( kv.second->to_json_string_representation() );
item->set_type("JSON");
} else if ((obj->_payload_type=="MAP") || (obj->_payload_type=="STR")) {
// legacy mode
item->set_value( json_value_cast<std::string>(kv.second->document));
item->set_type("STR");
}
}
for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) {
protobuf::LinkSet* links = pbo->add_links();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
pbo->SerializeToString(&wire);
switch(obj->access_mode()) {
case IU_ACCESS_PUSH:
return "ipaaca-iu";
case IU_ACCESS_MESSAGE:
return "ipaaca-messageiu";
default:
return getWireSchema();
}
}
IPAACA_EXPORT AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "ipaaca-iu"
boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
pbo->ParseFromString(wire);
IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode());
switch(mode) {
case IU_ACCESS_PUSH:
{
// Create a "remote push IU"
boost::shared_ptr<RemotePushIU> obj = RemotePushIU::create();
// transfer pbo data to obj
obj->_uid = pbo->uid();
obj->_revision = pbo->revision();
obj->_category = pbo->category();
obj->_payload_type = pbo->payload_type();
obj->_owner_name = pbo->owner_name();
obj->_committed = pbo->committed();
obj->_read_only = pbo->read_only();
obj->_access_mode = IU_ACCESS_PUSH;
for (int i=0; i<pbo->payload_size(); i++) {
const protobuf::PayloadItem& it = pbo->payload(i);
PayloadDocumentEntry::ptr entry;
if (it.type() == "JSON") {
// fully parse json text
entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
} else {
// assuming legacy "str" -> just copy value to raw string in document
entry = std::make_shared<PayloadDocumentEntry>();
entry->document.SetString(it.value(), entry->document.GetAllocator());
}
obj->_payload._document_store[it.key()] = entry;
}
for (int i=0; i<pbo->links_size(); i++) {
const protobuf::LinkSet& pls = pbo->links(i);
LinkSet& ls = obj->_links._links[pls.type()];
for (int j=0; j<pls.targets_size(); j++) {
ls.insert(pls.targets(j));
}
}
return std::make_pair("ipaaca::RemotePushIU", obj);
break;
}
case IU_ACCESS_MESSAGE:
{
// Create a "Message-type IU"
boost::shared_ptr<RemoteMessage> obj = RemoteMessage::create();
//std::cout << "REFCNT after create: " << obj.use_count() << std::endl;
// transfer pbo data to obj
obj->_uid = pbo->uid();
obj->_revision = pbo->revision();
obj->_category = pbo->category();
obj->_payload_type = pbo->payload_type();
obj->_owner_name = pbo->owner_name();
obj->_committed = pbo->committed();
obj->_read_only = pbo->read_only();
obj->_access_mode = IU_ACCESS_MESSAGE;
for (int i=0; i<pbo->payload_size(); i++) {
const protobuf::PayloadItem& it = pbo->payload(i);
PayloadDocumentEntry::ptr entry;
if (it.type() == "JSON") {
// fully parse json text
entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
} else {
// assuming legacy "str" -> just copy value to raw string in document
entry = std::make_shared<PayloadDocumentEntry>();
entry->document.SetString(it.value(), entry->document.GetAllocator());
}
obj->_payload._document_store[it.key()] = entry;
}
for (int i=0; i<pbo->links_size(); i++) {
const protobuf::LinkSet& pls = pbo->links(i);
LinkSet& ls = obj->_links._links[pls.type()];
for (int j=0; j<pls.targets_size(); j++) {
ls.insert(pls.targets(j));
}
}
return std::make_pair("ipaaca::RemoteMessage", obj);
break;
}
default:
// no other cases (yet)
throw NotImplementedError();
}
}
//}}}
// MessageConverter//{{{
IPAACA_EXPORT MessageConverter::MessageConverter()
: Converter<std::string> (IPAACA_SYSTEM_DEPENDENT_CLASS_NAME("ipaaca::Message"), "ipaaca-messageiu", true)
{
}
IPAACA_EXPORT std::string MessageConverter::serialize(const AnnotatedData& data, std::string& wire)
{
assert(data.first == getDataType()); // "ipaaca::Message"
boost::shared_ptr<const Message> obj = boost::static_pointer_cast<const Message> (data.second);
boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
// transfer obj data to pbo
pbo->set_uid(obj->uid());
pbo->set_revision(obj->revision());
pbo->set_category(obj->category());
pbo->set_payload_type(obj->payload_type());
pbo->set_owner_name(obj->owner_name());
pbo->set_committed(obj->committed());
ipaaca::protobuf::IU_AccessMode a_m;
switch(obj->access_mode()) {
case IU_ACCESS_PUSH:
a_m = ipaaca::protobuf::IU_AccessMode_PUSH;
break;
case IU_ACCESS_REMOTE:
a_m = ipaaca::protobuf::IU_AccessMode_REMOTE;
break;
case IU_ACCESS_MESSAGE:
a_m = ipaaca::protobuf::IU_AccessMode_MESSAGE;
break;
}
pbo->set_access_mode(a_m);
pbo->set_read_only(obj->read_only());
for (auto& kv: obj->_payload._document_store) {
protobuf::PayloadItem* item = pbo->add_payload();
item->set_key(kv.first);
if (obj->_payload_type=="JSON") {
item->set_value( kv.second->to_json_string_representation() );
item->set_type("JSON");
} else if ((obj->_payload_type=="MAP") || (obj->_payload_type=="STR")) {
// legacy mode
item->set_value( json_value_cast<std::string>(kv.second->document));
item->set_type("STR");
}
}
for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) {
protobuf::LinkSet* links = pbo->add_links();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
pbo->SerializeToString(&wire);
switch(obj->access_mode()) {
case IU_ACCESS_PUSH:
return "ipaaca-iu";
case IU_ACCESS_MESSAGE:
return "ipaaca-messageiu";
default:
return getWireSchema();
}
}
IPAACA_EXPORT AnnotatedData MessageConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "ipaaca-iu"
boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
pbo->ParseFromString(wire);
IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode());
switch(mode) {
case IU_ACCESS_PUSH:
{
// Create a "remote push IU"
boost::shared_ptr<RemotePushIU> obj = RemotePushIU::create();
// transfer pbo data to obj
obj->_uid = pbo->uid();
obj->_revision = pbo->revision();
obj->_category = pbo->category();
obj->_payload_type = pbo->payload_type();
obj->_owner_name = pbo->owner_name();
obj->_committed = pbo->committed();
obj->_read_only = pbo->read_only();
obj->_access_mode = IU_ACCESS_PUSH;
for (int i=0; i<pbo->payload_size(); i++) {
const protobuf::PayloadItem& it = pbo->payload(i);
PayloadDocumentEntry::ptr entry;
if (it.type() == "JSON") {
// fully parse json text
entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
} else {
// assuming legacy "str" -> just copy value to raw string in document
entry = std::make_shared<PayloadDocumentEntry>();
entry->document.SetString(it.value(), entry->document.GetAllocator());
}
obj->_payload._document_store[it.key()] = entry;
}
for (int i=0; i<pbo->links_size(); i++) {
const protobuf::LinkSet& pls = pbo->links(i);
LinkSet& ls = obj->_links._links[pls.type()];
for (int j=0; j<pls.targets_size(); j++) {
ls.insert(pls.targets(j));
}
}
return std::make_pair("ipaaca::RemotePushIU", obj);
break;
}
case IU_ACCESS_MESSAGE:
{
// Create a "Message-type IU"
boost::shared_ptr<RemoteMessage> obj = RemoteMessage::create();
// transfer pbo data to obj
obj->_uid = pbo->uid();
obj->_revision = pbo->revision();
obj->_category = pbo->category();
obj->_payload_type = pbo->payload_type();
obj->_owner_name = pbo->owner_name();
obj->_committed = pbo->committed();
obj->_read_only = pbo->read_only();
obj->_access_mode = IU_ACCESS_MESSAGE;
for (int i=0; i<pbo->payload_size(); i++) {
const protobuf::PayloadItem& it = pbo->payload(i);
PayloadDocumentEntry::ptr entry;
if (it.type() == "JSON") {
// fully parse json text
entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
} else {
// assuming legacy "str" -> just copy value to raw string in document
entry = std::make_shared<PayloadDocumentEntry>();
entry->document.SetString(it.value(), entry->document.GetAllocator());
}
obj->_payload._document_store[it.key()] = entry;
}
for (int i=0; i<pbo->links_size(); i++) {
const protobuf::LinkSet& pls = pbo->links(i);
LinkSet& ls = obj->_links._links[pls.type()];
for (int j=0; j<pls.targets_size(); j++) {
ls.insert(pls.targets(j));
}
}
return std::make_pair("ipaaca::RemoteMessage", obj);
break;
}
default:
// no other cases (yet)
throw NotImplementedError();
}
}
//}}}
// IUPayloadUpdateConverter//{{{
IPAACA_EXPORT IUPayloadUpdateConverter::IUPayloadUpdateConverter()
: Converter<std::string> (IPAACA_SYSTEM_DEPENDENT_CLASS_NAME("ipaaca::IUPayloadUpdate"), "ipaaca-iu-payload-update", true)
{
}
IPAACA_EXPORT std::string IUPayloadUpdateConverter::serialize(const AnnotatedData& data, std::string& wire)
{
assert(data.first == getDataType()); // "ipaaca::IUPayloadUpdate"
boost::shared_ptr<const IUPayloadUpdate> obj = boost::static_pointer_cast<const IUPayloadUpdate> (data.second);
boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
// transfer obj data to pbo
pbo->set_uid(obj->uid);
pbo->set_revision(obj->revision);
pbo->set_writer_name(obj->writer_name);
pbo->set_is_delta(obj->is_delta);
for (auto& kv: obj->new_items) {
protobuf::PayloadItem* item = pbo->add_new_items();
item->set_key(kv.first);
if (obj->payload_type=="JSON") {
item->set_value( kv.second->to_json_string_representation() );
item->set_type("JSON");
} else if ((obj->payload_type=="MAP") || (obj->payload_type=="STR")) {
// legacy mode
item->set_value( json_value_cast<std::string>(kv.second->document));
item->set_type("STR");
} else {
IPAACA_ERROR("Uninitialized payload update type!")
throw NotImplementedError();
}
IPAACA_DEBUG("Adding updated item (type " << item->type() << "): " << item->key() << " -> " << item->value() )
}
for (auto& key: obj->keys_to_remove) {
pbo->add_keys_to_remove(key);
IPAACA_DEBUG("Adding removed key: " << key)
}
pbo->SerializeToString(&wire);
return getWireSchema();
}
AnnotatedData IUPayloadUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "ipaaca-iu-payload-update"
boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
pbo->ParseFromString(wire);
boost::shared_ptr<IUPayloadUpdate> obj(new IUPayloadUpdate());
// transfer pbo data to obj
obj->uid = pbo->uid();
obj->revision = pbo->revision();
obj->writer_name = pbo->writer_name();
obj->is_delta = pbo->is_delta();
for (int i=0; i<pbo->new_items_size(); i++) {
const protobuf::PayloadItem& it = pbo->new_items(i);
PayloadDocumentEntry::ptr entry;
if (it.type() == "JSON") {
// fully parse json text
entry = PayloadDocumentEntry::from_json_string_representation( it.value() );
IPAACA_INFO("New/updated payload entry: " << it.key() << " -> " << it.value() )
} else {
// assuming legacy "str" -> just copy value to raw string in document
entry = std::make_shared<PayloadDocumentEntry>();
entry->document.SetString(it.value(), entry->document.GetAllocator());
}
obj->new_items[it.key()] = entry;
}
for (int i=0; i<pbo->keys_to_remove_size(); i++) {
obj->keys_to_remove.push_back(pbo->keys_to_remove(i));
}
return std::make_pair(getDataType(), obj);
}
//}}}
// IULinkUpdateConverter//{{{
IPAACA_EXPORT IULinkUpdateConverter::IULinkUpdateConverter()
: Converter<std::string> (IPAACA_SYSTEM_DEPENDENT_CLASS_NAME("ipaaca::IULinkUpdate"), "ipaaca-iu-link-update", true)
{
}
IPAACA_EXPORT std::string IULinkUpdateConverter::serialize(const AnnotatedData& data, std::string& wire)
{
assert(data.first == getDataType());
boost::shared_ptr<const IULinkUpdate> obj = boost::static_pointer_cast<const IULinkUpdate> (data.second);
boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
// transfer obj data to pbo
pbo->set_uid(obj->uid);
pbo->set_revision(obj->revision);
pbo->set_writer_name(obj->writer_name);
pbo->set_is_delta(obj->is_delta);
for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->new_links.begin(); it!=obj->new_links.end(); ++it) {
protobuf::LinkSet* links = pbo->add_new_links();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->links_to_remove.begin(); it!=obj->links_to_remove.end(); ++it) {
protobuf::LinkSet* links = pbo->add_links_to_remove();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
pbo->SerializeToString(&wire);
return getWireSchema();
}
AnnotatedData IULinkUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "ipaaca-iu-link-update"
boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
pbo->ParseFromString(wire);
boost::shared_ptr<IULinkUpdate> obj(new IULinkUpdate());
// transfer pbo data to obj
obj->uid = pbo->uid();
obj->revision = pbo->revision();
obj->writer_name = pbo->writer_name();
obj->is_delta = pbo->is_delta();
for (int i=0; i<pbo->new_links_size(); ++i) {
const protobuf::LinkSet& it = pbo->new_links(i);
for (int j=0; j<it.targets_size(); ++j) {
obj->new_links[it.type()].insert(it.targets(j)); // = vec;
}
}
for (int i=0; i<pbo->links_to_remove_size(); ++i) {
const protobuf::LinkSet& it = pbo->links_to_remove(i);
for (int j=0; j<it.targets_size(); ++j) {
obj->links_to_remove[it.type()].insert(it.targets(j));
}
}
return std::make_pair(getDataType(), obj);
}
//}}}
/*
// IntConverter//{{{
IPAACA_EXPORT IntConverter::IntConverter()
: Converter<std::string> ("int", "int32", true)
{
}
IPAACA_EXPORT std::string IntConverter::serialize(const AnnotatedData& data, std::string& wire)
{
assert(data.first == getDataType());
// NOTE: a dynamic_pointer_cast cannot be used from void*
boost::shared_ptr<const int> obj = boost::static_pointer_cast<const int> (data.second);
boost::shared_ptr<protobuf::IntMessage> pbo(new protobuf::IntMessage());
// transfer obj data to pbo
pbo->set_value(*obj);
pbo->SerializeToString(&wire);
return getWireSchema();
}
IPAACA_EXPORT AnnotatedData IntConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "int"
boost::shared_ptr<protobuf::IntMessage> pbo(new protobuf::IntMessage());
pbo->ParseFromString(wire);
boost::shared_ptr<int> obj = boost::shared_ptr<int>(new int(pbo->value()));
return std::make_pair("int", obj);
}
//}}}
*/
} // of namespace ipaaca
......@@ -3,7 +3,7 @@
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
......@@ -35,11 +35,6 @@
namespace ipaaca {
using namespace rsb;
using namespace rsb::filter;
using namespace rsb::converter;
using namespace rsb::patterns;
IPAACA_EXPORT std::ostream& operator<<(std::ostream& os, const IUInterface& obj)//{{{
{
os << "IUInterface(uid='" << obj.uid() << "'";
......@@ -142,7 +137,7 @@ IPAACA_EXPORT void IUInterface::set_links(const LinkMap& links, const std::strin
_replace_links(links);
}
IPAACA_HEADER_EXPORT const std::string& IUInterface::channel()
IPAACA_EXPORT const std::string& IUInterface::channel()
{
if (_buffer == NULL)
throw IUUnpublishedError();
......
......@@ -3,7 +3,7 @@
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
......@@ -35,11 +35,6 @@
namespace ipaaca {
using namespace rsb;
using namespace rsb::filter;
using namespace rsb::converter;
using namespace rsb::patterns;
// IU//{{{
IPAACA_EXPORT IU::ptr IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
{
......@@ -84,7 +79,7 @@ IPAACA_EXPORT void IU::_modify_links(bool is_delta, const LinkMap& new_links, co
IPAACA_EXPORT void IU::_modify_payload(bool is_delta, const std::map<std::string, PayloadDocumentEntry::ptr>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
{
IPAACA_INFO("")
//IPAACA_INFO("")
_revision_lock.lock();
if (_committed) {
_revision_lock.unlock();
......@@ -191,7 +186,7 @@ IPAACA_EXPORT void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new
} else if (_read_only) {
throw IUReadOnlyError();
}
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
auto server = ((InputBuffer*)_buffer)->_get_remote_server(_owner_name);
IULinkUpdate::ptr update = IULinkUpdate::ptr(new IULinkUpdate());
update->uid = _uid;
update->revision = _revision;
......@@ -199,11 +194,11 @@ IPAACA_EXPORT void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new
update->writer_name = _buffer->unique_name();
update->new_links = new_links;
update->links_to_remove = links_to_remove;
boost::shared_ptr<int> result = server->call<int>("updateLinks", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO
if (*result == 0) {
int result = server->request_remote_link_update(update); // TODO
if (result == 0) {
throw IUUpdateFailedError();
} else {
_revision = *result;
_revision = result;
}
}
IPAACA_EXPORT void RemotePushIU::_modify_payload(bool is_delta, const std::map<std::string, PayloadDocumentEntry::ptr>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
......@@ -216,7 +211,7 @@ IPAACA_EXPORT void RemotePushIU::_modify_payload(bool is_delta, const std::map<s
} else if (_read_only) {
throw IUReadOnlyError();
}
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
auto server = ((InputBuffer*)_buffer)->_get_remote_server(_owner_name);
IUPayloadUpdate::ptr update = IUPayloadUpdate::ptr(new IUPayloadUpdate());
update->uid = _uid;
update->revision = _revision;
......@@ -225,11 +220,11 @@ IPAACA_EXPORT void RemotePushIU::_modify_payload(bool is_delta, const std::map<s
update->new_items = new_items;
update->keys_to_remove = keys_to_remove;
update->payload_type = _payload_type;
boost::shared_ptr<int> result = server->call<int>("updatePayload", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO
if (*result == 0) {
int result = server->request_remote_payload_update(update);
if (result == 0) {
throw IUUpdateFailedError();
} else {
_revision = *result;
_revision = result;
}
}
......@@ -244,16 +239,16 @@ IPAACA_EXPORT void RemotePushIU::commit()
// Following python version: ignoring multiple commit
return;
}
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
boost::shared_ptr<protobuf::IUCommission> update = boost::shared_ptr<protobuf::IUCommission>(new protobuf::IUCommission());
auto server = ((InputBuffer*)_buffer)->_get_remote_server(_owner_name);
std::shared_ptr<protobuf::IUCommission> update = std::shared_ptr<protobuf::IUCommission>(new protobuf::IUCommission());
update->set_uid(_uid);
update->set_revision(_revision);
update->set_writer_name(_buffer->unique_name());
boost::shared_ptr<int> result = server->call<int>("commit", update, IPAACA_REMOTE_SERVER_TIMEOUT); // TODO
if (*result == 0) {
int result = server->request_remote_commission(update);
if (result == 0) {
throw IUUpdateFailedError();
} else {
_revision = *result;
_revision = result;
}
}
......
......@@ -3,7 +3,7 @@
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
......
......@@ -3,7 +3,7 @@
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
......@@ -35,11 +35,6 @@
namespace ipaaca {
using namespace rsb;
using namespace rsb::filter;
using namespace rsb::converter;
using namespace rsb::patterns;
IPAACA_EXPORT std::ostream& operator<<(std::ostream& os, const SmartLinkMap& obj)//{{{
{
os << "{";
......
......@@ -3,7 +3,7 @@
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
......
......@@ -3,7 +3,7 @@
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
......@@ -33,15 +33,12 @@
#include <ipaaca/ipaaca.h>
#include <sstream>
namespace ipaaca {
using namespace rapidjson;
using namespace rsb;
using namespace rsb::filter;
using namespace rsb::converter;
using namespace rsb::patterns;
// temporary helper to show rapidjson internal type
std::string value_diagnosis(rapidjson::Value* val)
{
......@@ -279,7 +276,7 @@ IPAACA_EXPORT PayloadEntryProxy::PayloadEntryProxy(Payload* payload, const std::
: _payload(payload), _key(key), parent(nullptr)
{
document_entry = _payload->get_entry(key);
json_value = &(document_entry->document);
set_json_value(&(document_entry->document), "construction");
}
IPAACA_EXPORT PayloadEntryProxy::PayloadEntryProxy(PayloadEntryProxy* parent_, const std::string& addr_key_)
: parent(parent_), addressed_key(addr_key_), addressed_as_array(false)
......@@ -289,10 +286,10 @@ IPAACA_EXPORT PayloadEntryProxy::PayloadEntryProxy(PayloadEntryProxy* parent_, c
document_entry = parent->document_entry;
auto it = parent->json_value->FindMember(addr_key_.c_str());
if (it != parent->json_value->MemberEnd()) {
json_value = &(parent->json_value->operator[](addr_key_.c_str()));
set_json_value(&(parent->json_value->operator[](addr_key_.c_str())), std::string("construction from str addressing with ")+addr_key_);
existent = true;
} else {
json_value = nullptr; // avoid heap construction here
set_json_value(nullptr, std::string("null-construction from failed str addressing with ")+addr_key_); // avoid heap construction here
existent = false;
}
}
......@@ -302,7 +299,7 @@ IPAACA_EXPORT PayloadEntryProxy::PayloadEntryProxy(PayloadEntryProxy* parent_, s
_payload = parent->_payload;
_key = parent->_key;
document_entry = parent->document_entry;
json_value = &(parent->json_value->operator[](addr_idx_));
set_json_value(&(parent->json_value->operator[](addr_idx_)), std::string("construction from int addressing with ")+std::to_string(addr_idx_));
existent = true;
}
......@@ -316,8 +313,10 @@ IPAACA_EXPORT PayloadEntryProxy PayloadEntryProxy::operator[](const std::string&
IPAACA_INFO("Invalid json_value")
throw PayloadAddressingError();
}
//IPAACA_DEBUG("string addressing with '" << addr_key_ << "' of json object " << (off_t) json_value )
if (! json_value->IsObject()) {
IPAACA_INFO("Expected Object for operator[](string)")
//IPAACA_DEBUG(" But type is: " << value_diagnosis(json_value) )
throw PayloadAddressingError();
}
return PayloadEntryProxy(this, addr_key_);
......@@ -364,6 +363,10 @@ IPAACA_EXPORT PayloadEntryProxy::operator std::string()
{
return json_value_cast<std::string>(json_value);
}
IPAACA_EXPORT PayloadEntryProxy::operator int()
{
return json_value_cast<int>(json_value);
}
IPAACA_EXPORT PayloadEntryProxy::operator long()
{
return json_value_cast<long>(json_value);
......@@ -380,6 +383,10 @@ IPAACA_EXPORT std::string PayloadEntryProxy::to_str()
{
return json_value_cast<std::string>(json_value);
}
IPAACA_EXPORT int PayloadEntryProxy::to_int()
{
return json_value_cast<int>(json_value);
}
IPAACA_EXPORT long PayloadEntryProxy::to_long()
{
return json_value_cast<long>(json_value);
......@@ -388,6 +395,10 @@ IPAACA_EXPORT double PayloadEntryProxy::to_float()
{
return json_value_cast<double>(json_value);
}
IPAACA_EXPORT double PayloadEntryProxy::to_double()
{
return json_value_cast<double>(json_value);
}
IPAACA_EXPORT bool PayloadEntryProxy::to_bool()
{
return json_value_cast<bool>(json_value);
......@@ -451,7 +462,7 @@ IPAACA_EXPORT void Payload::on_lock()
IPAACA_DEBUG("Starting payload batch update mode ...")
_update_on_every_change = false;
std::stringstream ss;
ss << boost::this_thread::get_id();
ss << std::this_thread::get_id();
_writing_thread_id = ss.str();
}
IPAACA_EXPORT void Payload::on_unlock()
......@@ -467,9 +478,9 @@ IPAACA_EXPORT void Payload::on_unlock()
_writing_thread_id = "";
}
IPAACA_EXPORT void Payload::initialize(boost::shared_ptr<IUInterface> iu)
IPAACA_EXPORT void Payload::initialize(std::shared_ptr<IUInterface> iu)
{
_iu = boost::weak_ptr<IUInterface>(iu);
_iu = std::weak_ptr<IUInterface>(iu);
}
IPAACA_EXPORT PayloadEntryProxy Payload::operator[](const std::string& key)
......@@ -594,7 +605,7 @@ IPAACA_EXPORT void Payload::_internal_merge_and_remove(const std::map<std::strin
IPAACA_EXPORT PayloadDocumentEntry::ptr Payload::get_entry(const std::string& k) {
if (! _update_on_every_change) {
std::stringstream ss;
ss << boost::this_thread::get_id();
ss << std::this_thread::get_id();
if (_writing_thread_id == ss.str()) {
IPAACA_DEBUG("Payload locked by current thread, looking for payload key in caches first")
// in batch mode, read from cached writed first!
......@@ -708,6 +719,7 @@ IPAACA_EXPORT PayloadEntryProxyMapIterator::PayloadEntryProxyMapIterator(Payload
IPAACA_EXPORT PayloadEntryProxyMapIterator& PayloadEntryProxyMapIterator::operator++()
{
//IPAACA_DEBUG("Map iterator incrementing");
raw_iterator++;
return *this;
}
......@@ -715,6 +727,8 @@ IPAACA_EXPORT PayloadEntryProxyMapIterator& PayloadEntryProxyMapIterator::operat
IPAACA_EXPORT std::pair<std::string, PayloadEntryProxy> PayloadEntryProxyMapIterator::operator*()
{
std::string key = raw_iterator->name.GetString();
//IPAACA_DEBUG("Deref map iterator key '" << key << "'");
//IPAACA_DEBUG(" of proxy " << (off_t) proxy << "");
return std::pair<std::string, PayloadEntryProxy>(key, (*proxy)[key] ); // generates child Proxy
}
......
......@@ -3,7 +3,7 @@
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
......@@ -87,28 +87,34 @@ std::string str_join(const std::vector<std::string>& vec,const std::string& sep)
return tmp;
}
void str_split_wipe(const std::string& str, std::vector<std::string>& tokens, const std::string& delimiters )
int str_split_wipe(const std::string& str, std::vector<std::string>& tokens, const std::string& delimiters )
{
tokens.clear();
std::string::size_type lastPos = str.find_first_not_of(delimiters, 0);
std::string::size_type pos = str.find_first_of(delimiters, lastPos);
int count = 0;
while (std::string::npos != pos || std::string::npos != lastPos)
{
count++;
tokens.push_back(str.substr(lastPos, pos - lastPos));
lastPos = str.find_first_not_of(delimiters, pos);
pos = str.find_first_of(delimiters, lastPos);
}
return count;
}
void str_split_append(const std::string& str, std::vector<std::string>& tokens, const std::string& delimiters )
int str_split_append(const std::string& str, std::vector<std::string>& tokens, const std::string& delimiters )
{
std::string::size_type lastPos = str.find_first_not_of(delimiters, 0);
std::string::size_type pos = str.find_first_of(delimiters, lastPos);
int count = 0;
while (std::string::npos != pos || std::string::npos != lastPos)
{
count++;
tokens.push_back(str.substr(lastPos, pos - lastPos));
lastPos = str.find_first_not_of(delimiters, pos);
pos = str.find_first_of(delimiters, lastPos);
}
return count;
}
} // namespace ipaaca
......
......@@ -3,7 +3,7 @@
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
......@@ -48,6 +48,9 @@
#include <cstdio>
#include <iomanip>
#include <thread>
#include <chrono>
#if _WIN32 || _WIN64
double get_time_as_secs() { return 0.0; } // TODO implement time function for Windows when required
#else
......@@ -61,7 +64,7 @@
class TesterCpp {
public:
void handle_iu_inbuf(boost::shared_ptr<ipaaca::IUInterface> iu, ipaaca::IUEventType etype, bool local)
void handle_iu_inbuf(std::shared_ptr<ipaaca::IUInterface> iu, ipaaca::IUEventType etype, bool local)
{
std::cout << std::fixed << std::setprecision(3) << get_time_as_secs() << " ";
std::cout << ipaaca::iu_event_type_to_str(etype) << " category=" << iu->category() << " uid=" << iu->uid() << std::endl;
......@@ -86,17 +89,39 @@ class TesterCpp {
std::cout << "\t'" << kv.first << "': " << ((std::string) kv.second) << ',' << std::endl;
}
std::cout << "}" << std::endl;
if (etype == IU_ADDED) {
std::cout << "Will send a modification to a received new IU" << std::endl;
int cnt=5;
while(cnt>0) {
try {
iu->payload()["seen_by_cpp"] = true;
cnt = 0;
} catch(...) {
std::cout << "... failed, but we try more than once ..." << std::endl;
cnt--;
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
}
}
}
int run()
{
ipaaca::OutputBuffer::ptr ob = ipaaca::OutputBuffer::create("testerCpp");
ipaaca::InputBuffer::ptr ib = ipaaca::InputBuffer::create("testerCpp", std::set<std::string>{""});
ipaaca::InputBuffer::ptr ib = ipaaca::InputBuffer::create("testerCpp", std::set<std::string>{"testcategory"}); // MQTT requires # as a wildcard!
ib->set_resend(true);
ib->register_handler(boost::bind(&TesterCpp::handle_iu_inbuf, this, _1, _2, _3));
std::cout << "Listening for all IU events ..." << std::endl;
while(true) {
sleep(5);
ib->register_handler(IPAACA_BIND_CLASS_HANDLER(&TesterCpp::handle_iu_inbuf, this));
std::cout << "Listening for all IU events and sending a Message after 1 sec ..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
auto msg = ipaaca::IU::create("testcategory");
msg->payload()["hello"] = "world";
ob->add(msg);
int i = 0;
while(true) {
std::this_thread::sleep_for(std::chrono::seconds(5));
auto msg = ipaaca::Message::create("testcate2");
msg->payload()["num"] = ++i;
ob->add(msg);
}
return 0;
}
......@@ -105,8 +130,17 @@ class TesterCpp {
int main(int argc, char** argv)
{
/*
auto config = ipaaca::get_global_config(true);
for (auto it = config->begin(); it!=config->end(); ++it ) {
std::cout << it->first << "=" << it->second << std::endl;
}
exit(1);
*/
ipaaca::__ipaaca_static_option_log_level = IPAACA_LOG_LEVEL_DEBUG;
TesterCpp tester;
tester.run();
}
......@@ -3,7 +3,7 @@
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
......@@ -51,6 +51,7 @@ IPAACA_EXPORT std::string generate_uuid_string()//{{{
RpcStringFree(&uuid_str);
return result;
}
throw UUIDGenerationError();
} else {
throw UUIDGenerationError();
}
......
......@@ -3,7 +3,7 @@
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2015 Social Cognitive Systems Group
* Copyright (c) 2009-2022 Social Cognitive Systems Group
* (formerly the Sociable Agents Group)
* CITEC, Bielefeld University
*
......@@ -122,7 +122,7 @@ void ComponentNotifier::initialize() {
Locker locker(lock);
if (!initialized) {
initialized = true;
in_buf->register_handler(boost::bind(&ComponentNotifier::handle_iu_event, this, _1, _2, _3));
in_buf->register_handler(std::bind(&ComponentNotifier::handle_iu_event, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
submit_notify(_IPAACA_COMP_NOTIF_STATE_NEW);
}
}
......
......@@ -3,7 +3,7 @@
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2013 Sociable Agents Group
* Copyright (c) 2009-2022 Sociable Agents Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
......
......@@ -3,7 +3,7 @@
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2013 Sociable Agents Group
* Copyright (c) 2009-2022 Sociable Agents Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
......
......@@ -3,7 +3,7 @@
* "Incremental Processing Architecture
* for Artificial Conversational Agents".
*
* Copyright (c) 2009-2013 Sociable Agents Group
* Copyright (c) 2009-2022 Sociable Agents Group
* CITEC, Bielefeld University
*
* http://opensource.cit-ec.de/projects/ipaaca/
......
......@@ -6,7 +6,7 @@
<dependencies>
<dependency org="slf4j" name="slf4j-api" rev="latest.release" />
<dependency org="google" name="guava" rev="latest.release" />
<dependency org="google" name="protobuf-java" rev="latest.release" />
<dependency org="google" name="protobuf-java" rev="2.6.1" />
<dependency org="rsb" name="rsb" rev="latest.release" />
<dependency org="lombok" name="lombok" rev="latest.release" />
<dependency org="apache" name="commons-lang" rev="latest.release" />
......
......@@ -2,7 +2,7 @@
// "Incremental Processing Architecture
// for Artificial Conversational Agents".
//
// Copyright (c) 2009-2014 Social Cognitive Systems Group
// Copyright (c) 2009-2022 Social Cognitive Systems Group
// CITEC, Bielefeld University
//
// http://opensource.cit-ec.de/projects/ipaaca/
......@@ -28,72 +28,150 @@
// Forschungsgemeinschaft (DFG) in the context of the German
// Excellence Initiative.
syntax = "proto2";
package ipaaca.protobuf;
enum TransportMessageType {
WireTypeRESERVED = 0;
WireTypeIntMessage = 1;
WireTypeRemoteRequestResult = 2;
WireTypeIU = 3;
WireTypeMessageIU = 4; // special case on the wire (use other converter)
WireTypeIUPayloadUpdate = 5;
WireTypeIULinkUpdate = 6;
WireTypeIURetraction = 7;
WireTypeIUCommission = 8;
WireTypeIUResendRequest = 9;
WireTypeIUPayloadUpdateRequest = 100;
WireTypeIUCommissionRequest = 101;
WireTypeIULinkUpdateRequest = 102;
}
message TransportLevelWrapper {
required TransportMessageType transport_message_type = 1;
required bytes raw_message = 2;
}
message IntMessage {
required sint32 value = 1;
required sint32 value = 1;
}
message LinkSet {
required string type = 1;
repeated string targets = 2;
required string type = 1;
repeated string targets = 2;
}
message PayloadItem {
required string key = 1;
required string value = 2;
required string type = 3 [default = "str"];
required string key = 1;
required string value = 2;
required string type = 3 [default = "str"];
}
message IU {
enum AccessMode {
PUSH = 0;
REMOTE = 1;
MESSAGE = 2;
}
required string uid = 1;
required uint32 revision = 2;
required string category = 3 [default = "undef"];
required string payload_type = 4 [default = "MAP"];
required string owner_name = 5;
required bool committed = 6 [default = false];
required AccessMode access_mode = 7 [default = PUSH];
required bool read_only = 8 [default = false];
repeated PayloadItem payload = 9;
repeated LinkSet links = 10;
enum AccessMode {
PUSH = 0;
REMOTE = 1;
MESSAGE = 2;
}
required string uid = 1;
required uint32 revision = 2;
required string category = 3 [default = "undef"];
required string payload_type = 4 [default = "MAP"];
required string owner_name = 5;
required bool committed = 6 [default = false];
required AccessMode access_mode = 7 [default = PUSH];
required bool read_only = 8 [default = false];
repeated PayloadItem payload = 9;
repeated LinkSet links = 10;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUPayloadUpdate {
required string uid = 1;
required uint32 revision = 2;
repeated PayloadItem new_items = 3;
repeated string keys_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
required string uid = 1;
required uint32 revision = 2;
repeated PayloadItem new_items = 3;
repeated string keys_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IURetraction {
required string uid = 1;
required uint32 revision = 2;
required string uid = 1;
required uint32 revision = 2;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUCommission {
required string uid = 1;
required uint32 revision = 2;
required string writer_name = 3;
required string uid = 1;
required uint32 revision = 2;
required string writer_name = 3;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IULinkUpdate {
required string uid = 1;
required uint32 revision = 2;
repeated LinkSet new_links = 3;
repeated LinkSet links_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUResendRequest {
required string uid = 1;
required string hidden_scope_name = 2;
required string uid = 1;
required string hidden_scope_name = 2;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IULinkUpdate {
required string uid = 1;
required uint32 revision = 2;
repeated LinkSet new_links = 3;
repeated LinkSet links_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
// Result for remote operations (below).
// Used to send a raw int, which was problematic.
// Usually: 0 = Failed, >0 = new revision of successfully modified resource.
message RemoteRequestResult {
required uint32 result = 1;
optional string request_uid = 100 [default = ""];
//optional string request_endpoint = 101 [default = ""];
}
// Remote / request versions of buffer setters:
// they just go with a dedicated ID
message IUPayloadUpdateRequest {
required string uid = 1;
required uint32 revision = 2;
repeated PayloadItem new_items = 3;
repeated string keys_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IUCommissionRequest {
required string uid = 1;
required uint32 revision = 2;
required string writer_name = 3;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
message IULinkUpdateRequest {
required string uid = 1;
required uint32 revision = 2;
repeated LinkSet new_links = 3;
repeated LinkSet links_to_remove = 4;
required bool is_delta = 5 [default = false];
required string writer_name = 6;
optional string request_uid = 100 [default = ""];
optional string request_endpoint = 101 [default = ""];
}
......@@ -5,8 +5,6 @@
</publications>
<dependencies>
<dependency org="google" name="protobuf-python" rev="latest.release"/>
<dependency org="rsb" name="rsb-python" rev="latest.release"/>
<dependency org="spread" name="spread" rev="latest.release"/>
</dependencies>
</ivy-module>
......
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Fri Sep 9 14:12:05 2016
@author: jpoeppel
"""
from setuptools import setup
import os
import sys
import subprocess
from os import path as op
from distutils.spawn import find_executable
from setuptools.command.build_py import build_py
from setuptools.command.bdist_egg import bdist_egg
from distutils.command.build import build
from distutils.command.sdist import sdist
class ProtoBuild(build_py):
"""
This command automatically compiles all .proto files with `protoc` compiler
and places generated files near them -- i.e. in the same directory.
"""
def find_protoc(self):
"Locates protoc executable"
if 'PROTOC' in os.environ and os.path.exists(os.environ['PROTOC']):
protoc = os.environ['PROTOC']
else:
protoc = find_executable('protoc')
if protoc is None:
sys.stderr.write('protoc not found. Is protobuf-compiler installed? \n'
'Alternatively, you can point the PROTOC environment variable at a local version.')
sys.exit(1)
return protoc
def run(self):
#TODO determine path automaticall
packagedir = "../proto"
print("running build proto")
for protofile in filter(lambda x: x.endswith('.proto'), os.listdir(packagedir)):
source = op.join(packagedir, protofile)
output = source.replace('.proto', '_pb2.py')
if (not op.exists(output) or (op.getmtime(source) > op.getmtime(output))):
sys.stderr.write('Protobuf-compiling ' + source + '\n')
subprocess.check_call([self.find_protoc(), "-I={}".format(packagedir),'--python_out=./src/ipaaca', source])
class BDist_egg(bdist_egg):
'''
Simple wrapper around the normal bdist_egg command to require
protobuf build before normal build.
.. codeauthor:: jwienke
'''
def run(self):
self.run_command('build_proto')
bdist_egg.run(self)
class Build(build):
'''
Simple wrapper around the normal build command to require protobuf build
before normal build.
.. codeauthor:: jwienke
'''
def run(self):
self.run_command('build_proto')
build.run(self)
class Sdist(sdist):
'''
Simple wrapper around the normal sdist command to require protobuf build
before generating the source distribution..
.. codeauthor:: jwienke
'''
def run(self):
# fetch the protocol before building the source distribution so that
# we have a cached version and each user can rebuild the protocol
# with his own protobuf version
self.run_command('build_proto')
sdist.run(self)
version = "0.1.3" #TODO determine correct version! ideally from git, maybe do something similar to rsb/setup.py
setup(name="ipaaca",
version=version,
author="Hendrik Buschmeier, Ramin Yaghoubzadeh, Sören Klett",
author_email="hbuschme@uni-bielefeld.de,ryaghoubzadeh@uni-bielefeld.de,sklett@techfak.uni-bielefeld.de",
license='LGPLv3+',
url='https://opensource.cit-ec.de/projects/ipaaca',
install_requires=["paho-mqtt", "six", "protobuf"],
packages=["ipaaca", "ipaaca.util"],
package_dir={"ipaaca":"src/ipaaca"},
# TODO Do we want to add ipaaca_pb2.py to the egg or as separate package?
# data_files=[("./ipaaca", ["ipaaca_pb2.py"])],
# dependency_links=[
# 'http://www.spread.org/files/'
# 'SpreadModule-1.5spread4.tgz#egg=SpreadModule-1.5spread4'],
cmdclass ={
"build_proto": ProtoBuild,
"sdist": Sdist,
"build": Build,
"bdist_egg":BDist_egg
}
)