Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • scs/ipaaca
  • ramin.yaghoubzadeh/ipaaca
2 results
Show changes
Commits on Source (406)
Showing with 498 additions and 2774 deletions
.DS_Store .DS_Store
*/generatedsrc generatedsrc
*/build build
*/lib docs/doxygen_generated
*/test/lib lib
*/test/report test/lib
*/dist report
*/privateprops dist
*/.project deps
*/.classpath privateprops
.project
.classpath
.setting
.settings
.pydevproject
*.pyc *.pyc
**/.*.swp *.swp
.*.sw[a-z] *.sw[a-z]
*.un~ *.un~
Session.vim Session.vim
manifest.mf
*.*~
stages:
- deploy
conda-build-job:
stage: deploy
image: dominikbattefeld/miniconda3-build:conda-22.9.0
# Some information for debugging
before_script:
- conda --version
- conda list
# The actual call to build the package
script:
- conda build conda.recipe/
# The path of job artifacts has to be relative to and be a child of our project directory.
# Thus, to upload our package as job artifact, we need to copy the output of conda build to our working directory.
after_script:
- cp -r /opt/conda/conda-bld conda-build-output
artifacts:
paths:
- conda-build-output/*
REQUIRED=""
OPTIONAL=""
GNU LESSER GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
This version of the GNU Lesser General Public License incorporates
the terms and conditions of version 3 of the GNU General Public
License, supplemented by the additional permissions listed below.
0. Additional Definitions.
As used herein, "this License" refers to version 3 of the GNU Lesser
General Public License, and the "GNU GPL" refers to version 3 of the GNU
General Public License.
"The Library" refers to a covered work governed by this License,
other than an Application or a Combined Work as defined below.
An "Application" is any work that makes use of an interface provided
by the Library, but which is not otherwise based on the Library.
Defining a subclass of a class defined by the Library is deemed a mode
of using an interface provided by the Library.
A "Combined Work" is a work produced by combining or linking an
Application with the Library. The particular version of the Library
with which the Combined Work was made is also called the "Linked
Version".
The "Minimal Corresponding Source" for a Combined Work means the
Corresponding Source for the Combined Work, excluding any source code
for portions of the Combined Work that, considered in isolation, are
based on the Application, and not on the Linked Version.
The "Corresponding Application Code" for a Combined Work means the
object code and/or source code for the Application, including any data
and utility programs needed for reproducing the Combined Work from the
Application, but excluding the System Libraries of the Combined Work.
1. Exception to Section 3 of the GNU GPL.
You may convey a covered work under sections 3 and 4 of this License
without being bound by section 3 of the GNU GPL.
2. Conveying Modified Versions.
If you modify a copy of the Library, and, in your modifications, a
facility refers to a function or data to be supplied by an Application
that uses the facility (other than as an argument passed when the
facility is invoked), then you may convey a copy of the modified
version:
a) under this License, provided that you make a good faith effort to
ensure that, in the event an Application does not supply the
function or data, the facility still operates, and performs
whatever part of its purpose remains meaningful, or
b) under the GNU GPL, with none of the additional permissions of
this License applicable to that copy.
3. Object Code Incorporating Material from Library Header Files.
The object code form of an Application may incorporate material from
a header file that is part of the Library. You may convey such object
code under terms of your choice, provided that, if the incorporated
material is not limited to numerical parameters, data structure
layouts and accessors, or small macros, inline functions and templates
(ten or fewer lines in length), you do both of the following:
a) Give prominent notice with each copy of the object code that the
Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the object code with a copy of the GNU GPL and this license
document.
4. Combined Works.
You may convey a Combined Work under terms of your choice that,
taken together, effectively do not restrict modification of the
portions of the Library contained in the Combined Work and reverse
engineering for debugging such modifications, if you also do each of
the following:
a) Give prominent notice with each copy of the Combined Work that
the Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the Combined Work with a copy of the GNU GPL and this license
document.
c) For a Combined Work that displays copyright notices during
execution, include the copyright notice for the Library among
these notices, as well as a reference directing the user to the
copies of the GNU GPL and this license document.
d) Do one of the following:
0) Convey the Minimal Corresponding Source under the terms of this
License, and the Corresponding Application Code in a form
suitable for, and under terms that permit, the user to
recombine or relink the Application with a modified version of
the Linked Version to produce a modified Combined Work, in the
manner specified by section 6 of the GNU GPL for conveying
Corresponding Source.
1) Use a suitable shared library mechanism for linking with the
Library. A suitable mechanism is one that (a) uses at run time
a copy of the Library already present on the user's computer
system, and (b) will operate properly with a modified version
of the Library that is interface-compatible with the Linked
Version.
e) Provide Installation Information, but only if you would otherwise
be required to provide such information under section 6 of the
GNU GPL, and only to the extent that such information is
necessary to install and execute a modified version of the
Combined Work produced by recombining or relinking the
Application with a modified version of the Linked Version. (If
you use option 4d0, the Installation Information must accompany
the Minimal Corresponding Source and Corresponding Application
Code. If you use option 4d1, you must provide the Installation
Information in the manner specified by section 6 of the GNU GPL
for conveying Corresponding Source.)
5. Combined Libraries.
You may place library facilities that are a work based on the
Library side by side in a single library together with other library
facilities that are not Applications and are not covered by this
License, and convey such a combined library under terms of your
choice, if you do both of the following:
a) Accompany the combined library with a copy of the same work based
on the Library, uncombined with any other library facilities,
conveyed under the terms of this License.
b) Give prominent notice with the combined library that part of it
is a work based on the Library, and explaining where to find the
accompanying uncombined form of the same work.
6. Revised Versions of the GNU Lesser General Public License.
The Free Software Foundation may publish revised and/or new versions
of the GNU Lesser General Public License from time to time. Such new
versions will be similar in spirit to the present version, but may
differ in detail to address new problems or concerns.
Each version is given a distinguishing version number. If the
Library as you received it specifies that a certain numbered version
of the GNU Lesser General Public License "or any later version"
applies to it, you have the option of following the terms and
conditions either of that published version or of any later version
published by the Free Software Foundation. If the Library as you
received it does not specify a version number of the GNU Lesser
General Public License, you may choose any version of the GNU Lesser
General Public License ever published by the Free Software Foundation.
If the Library as you received it specifies that a proxy can decide
whether future versions of the GNU Lesser General Public License shall
apply, that proxy's public statement of acceptance of any version is
permanent authorization for you to choose that version for the
Library.
# IPAACA
This repository contains the software library IPAACA developed by the Social Cognitive Systems Group at Bielefeld University. IPAACA stands for 'Incremental Processing Architecture for Artificial Conversational Agents.' The library is available in three languages: Python (Python3 compatible), C++, and Java, and for three operating systems: Linux, OS X, and Windows.
## Build dependencies
### Linux
Dependencies: Protocol Buffer (libprotobuf), Transport Protocol (**libmosquittopp**, ros, librsb)
sudo apt-get install libprotobuf-dev
sudo apt-get install libprotoc-dev
sudo apt-get install protobuf-compiler
sudo apt-get install mosquitto libmosquittopp-dev
## Build/Install instructions
### Linux
**Compiling C++ Version**
cd ipaaca/ipaacalib/cpp
mkdir build
cd build
cmake ..
make
sudo make install
**Installing Python Version**
cd ipaaca/ipaacalib/python
python3 setup.py install --user
### OS X
### Windows
## Usage
**Python:** import ipaaca
**C++:** #include "ipaaca/ipaaca.h"
## History
## Credits
Hendrik Buschmeier <hbuschme@techfak.uni-bielefeld.de>
Ramin Yaghoubzadeh <ryaghoub@techfak.uni-bielefeld.de>
## License
GNU LESSER GENERAL PUBLIC LICENSE (See LICENSE.txt in the repository)
## Further Reading
[1] IPAACA: https://scs.techfak.uni-bielefeld.de/wiki/public/ipaaca/start
[2] Schlangen et al. "Middleware for Incremental Processing in Conversational Agents," SIGDIAL 2010. https://www.aclweb.org/anthology/W10-4308
<project name="ipaaca" default="build" basedir="."> <project name="ipaaca-all" default="build" basedir=".">
<target name="resolve"> <import file="../hmibuild/build-recurse.xml" />
<subant target="resolve" genericantfile="build.xml">
<fileset dir="." includes="*/build.xml"/>
</subant>
</target>
<target name="build" depends="-pre-compilation">
<subant target="build" genericantfile="build.xml">
<fileset dir="." includes="*/build.xml"/>
</subant>
</target>
<target name="clean">
<subant target="clean" genericantfile="build.xml">
<fileset dir="." includes="*/build.xml"/>
</subant>
</target>
<target name="-pre-compilation">
<subant target="-pre-compilation" genericantfile="build.xml">
<fileset dir="." includes="*/build.xml"/>
</subant>
</target>
<target name="compile" depends="-pre-compilation">
<subant target="compile" genericantfile="build.xml">
<fileset dir="." includes="*/build.xml"/>
</subant>
</target>
</project> </project>
# this one is important
set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_PLATFORM Linux)
#this one not so much
set(CMAKE_SYSTEM_VERSION 1)
# specify the cross compiler
set(CMAKE_C_COMPILER $ENV{CC})
# where is the target environment
set(CMAKE_FIND_ROOT_PATH $ENV{PREFIX} $ENV{BUILD_PREFIX}/$ENV{HOST}/sysroot)
# search for programs in the build host directories
set(CMAKE_FIND_ROOT_PATH_MODE_PROGRAM NEVER)
# for libraries and headers in the target directories
set(CMAKE_FIND_ROOT_PATH_MODE_LIBRARY ONLY)
set(CMAKE_FIND_ROOT_PATH_MODE_INCLUDE ONLY)
# god-awful hack because it seems to not run correct tests to determine this:
set(__CHAR_UNSIGNED___EXITCODE 1)
\ No newline at end of file
#!/bin/bash
echo "Installing ipaaca-cpp"
declare -a CMAKE_PLATFORM_FLAGS
INCLUDE_PATH="${PREFIX}/include"
LIBRARY_PATH="${PREFIX}/lib"
export CMAKE_INCLUDE_PATH=$INCLUDE_PATH
export CMAKE_LIBRARY_PATH=$LIBRARY_PATH
if [[ "$(uname)" == "Linux" ]]; then
# Linus needs this lrt flag
export LDFLAGS="$LDFLAGS -lrt"
CMAKE_PLATFORM_FLAGS+=(-DCMAKE_TOOLCHAIN_FILE="${RECIPE_DIR}/cross-linux.cmake")
elif [[ "$(uname)" == "Darwin" ]]; then
echo "mac"
fi
# build protobuf files
mkdir -p ipaaca-cpp/build/ipaaca
cd proto
protoc ipaaca.proto --cpp_out=../ipaaca-cpp/build/ipaaca
# build ipaaca cpp
cd ../ipaaca-cpp/build
export VERBOSE=1
cmake .. -DCMAKE_INSTALL_PREFIX=${PREFIX} -DBOOST_INCLUDEDIR=${INCLUDE_PATH} -DBOOST_LIBRARYDIR=${LIBRARY_PATH} ${CMAKE_PLATFORM_FLAGS[@]}
echo "Finished cmake, trying build now"
make
make install DESTDIR=${PREFIX}
cd ../../..
\ No newline at end of file
#!/bin/bash
# Since we setup subfolders for ipaaca and proto, we want to step into the ipaaca folder here
# Note: setup.py assumes protoc file to be at ../proto from here and we do not want to change
# that for easier manual installation
cd ipaaca-py
echo "Installing ipaaca using setup.py"
$PYTHON -m pip install . -vv --no-deps
#python setup.py install --single-version-externally-managed --record=record.txt
{% set protobufversion = "3.8.0.*" %}
{% set pahomqttversion = "1.4.0.*" %} # Currently no longer needed, but you may want to build your own if you want to include Ramin's patch
package:
name: ipaaca
version: "0.1.3"
source:
- path: ../ipaacalib/cpp
folder: ipaaca-cpp
- path: ../ipaacalib/proto
folder: proto
- path: ../ipaacalib/python
folder: ipaaca-py
build:
number: 4
requirements:
run:
- ipaaca-cpp
- ipaaca-py
outputs:
- name: ipaaca-cpp
script: install_cpp.sh # [unix]
requirements:
build:
- {{ compiler('c') }}
- {{ compiler('cxx') }}
- cmake >=3.10
host:
- libprotobuf {{ protobufversion }}
- mosquitto
- boost-cpp
- libuuid
run:
- mosquitto
- libprotobuf {{ protobufversion }}
- boost-cpp
- libuuid
- name: ipaaca-py
noarch: python
script: install_python.sh
requirements:
host:
- python {{ python }}
- pip
- protobuf {{ protobufversion }}
run:
- python
- mosquitto
- paho-mqtt
- protobuf {{ protobufversion }}
{% set protobufversion = "3.8.0" %}
package:
name: ipaaca
version: "0.1.2"
source:
- path: ../ipaacalib/cpp
folder: ipaaca-cpp
- path: ../ipaacalib/proto
folder: proto
- path: ../ipaacalib/python
folder: ipaaca-py
build:
number: 9
requirements:
run:
- ipaaca-cpp
- ipaaca-py
outputs:
- name: ipaaca-cpp
script: install_cpp.sh # [unix]
requirements:
build:
- {{ compiler('c') }}
- {{ compiler('cxx') }}
- cmake >=3.10
host:
- libprotobuf {{ protobufversion }}
- mosquitto
- boost-cpp {{ boostcpp }}
run:
- mosquitto
- libprotobuf {{ protobufversion }}
- boost-cpp {{ boostcpp }}
- name: ipaaca-py
noarch: python
script: install_python.sh
requirements:
host:
- python {{ py_version }}
- pip
- setuptools
- protobuf {{ protobufversion }}
run:
- python {{ py_version }}
- mosquitto
- paho-mqtt
- protobuf {{ protobufversion }}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project name="IpaacaCpp" default="dist">
<import file="../../soashared/ant/build.xml" />
<target name="-pre-compilation">
<echo message="Compiling protobuf file" />
<exec executable="protoc">
<arg value="--proto_path=../proto" />
<arg value="../proto/ipaaca.proto" />
<arg value="--cpp_out=build/" />
</exec>
</target>
</project>
#ifndef __IPAACA_H__
#define __IPAACA_H_
/// ipaaca/IU/RSB protocol major version number
#define IPAACA_PROTOCOL_VERSION_MAJOR 1
/// ipaaca/IU/RSB protocol minor version number
#define IPAACA_PROTOCOL_VERSION_MINOR 0
/// running release number of ipaaca-c++
#define IPAACA_CPP_RELEASE_NUMBER 1
/// date of last release number increment
#define IPAACA_CPP_RELEASE_DATE "2012-04-13"
#ifdef IPAACA_DEBUG_MESSAGES
#define IPAACA_INFO(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- " << i << std::endl;
#define IPAACA_WARNING(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- WARNING: " << i << std::endl;
#define IPAACA_IMPLEMENT_ME std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- IMPLEMENT ME" << std::endl;
#define IPAACA_TODO(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- TODO: " << i << std::endl;
#else
#define IPAACA_INFO(i) ;
#define IPAACA_WARNING(i) ;
#define IPAACA_IMPLEMENT_ME ;
#define IPAACA_TODO(i) ;
#endif
/// marking pure virtual functions for extra readability
#define _IPAACA_ABSTRACT_
/// value to return when reading nonexistant payload keys
#define IPAACA_PAYLOAD_DEFAULT_STRING_VALUE ""
#include <iostream>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/pointer_cast.hpp>
#include <rsc/runtime/TypeStringTools.h>
#include <rsb/Factory.h>
#include <rsb/Handler.h>
#include <rsb/Event.h>
#include <rsb/converter/Repository.h>
#include <rsb/converter/ProtocolBufferConverter.h>
#include <rsb/converter/Converter.h>
#include <rsb/rsbexports.h>
#include <ipaaca.pb.h>
#include <pthread.h>
#include <uuid/uuid.h>
//using namespace boost;
using namespace rsb;
using namespace rsb::filter;
using namespace rsb::converter;
using namespace rsb::patterns;
namespace ipaaca {
typedef uint32_t revision_t;
/// Type of the IU event. Realized as an integer to enable bit masks for filters.
typedef uint32_t IUEventType;
#define IU_ADDED 1
#define IU_COMMITTED 2
#define IU_DELETED 4
#define IU_RETRACTED 8
#define IU_UPDATED 16
#define IU_LINKSUPDATED 32
/// Bit mask for receiving all events
#define IU_ALL_EVENTS 63
/// Convert an int event type to a human-readable string
inline std::string iu_event_type_to_str(IUEventType type)
{
switch(type) {
case IU_ADDED: return "ADDED";
case IU_COMMITTED: return "COMMITTED";
case IU_DELETED: return "DELETED";
case IU_RETRACTED: return "RETRACTED";
case IU_UPDATED: return "UPDATED";
case IU_LINKSUPDATED: return "LINKSUPDATED";
default: return "(NOT A KNOWN SINGLE IU EVENT TYPE)";
}
}
/// IU access mode: PUSH means that updates are broadcast; REMOTE means that reads are RPC calls; MESSAGE means a fire-and-forget message
enum IUAccessMode {
IU_ACCESS_PUSH,
IU_ACCESS_REMOTE,
IU_ACCESS_MESSAGE
};
class PayloadEntryProxy;
class Payload;
class IUInterface;
class IU;
class RemotePushIU;
class IULinkUpdate;
class IULinkUpdateConverter;
class IUPayloadUpdate;
class IUPayloadUpdateConverter;
class IUStore;
class FrozenIUStore;
class Buffer;
class InputBuffer;
class OutputBuffer;
/// generate a UUID as an ASCII string
std::string generate_uuid_string();
/// store for (local) IUs. TODO Stores need to be unified more
class IUStore: public std::map<std::string, boost::shared_ptr<IU> >
{
};
/// store for RemotePushIUs. TODO Stores need to be unified more
class RemotePushIUStore: public std::map<std::string, boost::shared_ptr<RemotePushIU> > // TODO genericize to all remote IU types
{
};
/// a reentrant lock/mutex
class Lock
{
protected:
pthread_mutexattr_t _attrs;
pthread_mutex_t _mutex;
public:
inline Lock() {
pthread_mutexattr_init(&_attrs);
pthread_mutexattr_settype(&_attrs, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&_mutex, &_attrs);
}
inline ~Lock() {
pthread_mutex_destroy(&_mutex);
pthread_mutexattr_destroy(&_attrs);
}
inline void lock() {
pthread_mutex_lock(&_mutex);
}
inline void unlock() {
pthread_mutex_unlock(&_mutex);
}
};
typedef std::set<std::string> LinkSet;
typedef std::map<std::string, LinkSet> LinkMap;
class SmartLinkMap {
friend std::ostream& operator<<(std::ostream& os, const SmartLinkMap& obj);
friend class IUInterface;
friend class IU;
friend class IUConverter;
public:
const LinkSet& get_links(const std::string& key);
const LinkMap& get_all_links();
protected:
LinkMap _links;
static LinkSet empty_link_set;
void _add_and_remove_links(const LinkMap& add, const LinkMap& remove);
void _replace_links(const LinkMap& links);
};
const LinkSet EMPTY_LINK_SET;
//const std::set<std::string> EMPTY_LINK_SET;
//typedef boost::function<void (const std::string&, bool, IUEventType, const std::string&)> IUEventHandlerFunction;
typedef boost::function<void (boost::shared_ptr<IUInterface>, IUEventType, bool)> IUEventHandlerFunction;
class IUEventHandler {
protected:
IUEventHandlerFunction _function;
IUEventType _event_mask;
bool _for_all_categories;
std::set<std::string> _categories;
protected:
inline bool _condition_met(IUEventType event_type, const std::string& category)
{
return ((_event_mask&event_type)!=0) && (_for_all_categories || (_categories.count(category)>0));
}
public:
IUEventHandler(IUEventHandlerFunction function, IUEventType event_mask, const std::string& category);
IUEventHandler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories);
//void call(Buffer* buffer, const std::string& uid, bool local, IUEventType event_type, const std::string& category);
void call(Buffer* buffer, boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category);
typedef boost::shared_ptr<IUEventHandler> ptr;
};
class Buffer { //: public boost::enable_shared_from_this<Buffer> {//{{{
friend class IU;
friend class RemotePushIU;
friend class CallbackIUPayloadUpdate;
friend class CallbackIULinkUpdate;
friend class CallbackIUCommission;
protected:
std::string _uuid;
std::string _basename;
std::string _unique_name;
std::string _id_prefix;
std::vector<IUEventHandler::ptr> _event_handlers;
protected:
_IPAACA_ABSTRACT_ virtual void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef") = 0;
_IPAACA_ABSTRACT_ virtual void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef") = 0;
_IPAACA_ABSTRACT_ virtual void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name="undef") = 0;
void _allocate_unique_name(const std::string& basename, const std::string& function);
inline Buffer(const std::string& basename, const std::string& function) {
_allocate_unique_name(basename, function);
}
void call_iu_event_handlers(boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category);
public:
virtual inline ~Buffer() { }
inline const std::string& unique_name() { return _unique_name; }
void register_handler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories);
void register_handler(IUEventHandlerFunction function, IUEventType event_mask = IU_ALL_EVENTS, const std::string& category="");
//_IPAACA_ABSTRACT_ virtual void add(boost::shared_ptr<IUInterface> iu) = 0;
_IPAACA_ABSTRACT_ virtual boost::shared_ptr<IUInterface> get(const std::string& iu_uid) = 0;
_IPAACA_ABSTRACT_ virtual std::set<boost::shared_ptr<IUInterface> > get_ius() = 0;
};
//}}}
class CallbackIUPayloadUpdate: public Server::Callback<IUPayloadUpdate, int> {
protected:
Buffer* _buffer;
public:
CallbackIUPayloadUpdate(Buffer* buffer);
boost::shared_ptr<int> call(const std::string& methodName, boost::shared_ptr<IUPayloadUpdate> update);
};
class CallbackIULinkUpdate: public Server::Callback<IULinkUpdate, int> {
protected:
Buffer* _buffer;
public:
CallbackIULinkUpdate(Buffer* buffer);
public:
boost::shared_ptr<int> call(const std::string& methodName, boost::shared_ptr<IULinkUpdate> update);
};
class CallbackIUCommission: public Server::Callback<protobuf::IUCommission, int> {
protected:
Buffer* _buffer;
public:
CallbackIUCommission(Buffer* buffer);
public:
boost::shared_ptr<int> call(const std::string& methodName, boost::shared_ptr<protobuf::IUCommission> update);
};
class OutputBuffer: public Buffer { //, public boost::enable_shared_from_this<OutputBuffer> {//{{{
friend class IU;
friend class RemotePushIU;
protected:
protected:
std::map<std::string, Informer<AnyType>::Ptr> _informer_store;
IUStore _iu_store;
Lock _iu_id_counter_lock;
ServerPtr _server;
protected:
// informing functions
void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef");
void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef");
void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name);
// remote access functions
// _remote_update_links(IULinkUpdate)
// _remote_update_payload(IUPayloadUpdate)
// _remote_commit(protobuf::IUCommission)
protected:
void _publish_iu(boost::shared_ptr<IU> iu);
void _retract_iu(boost::shared_ptr<IU> iu);
Informer<AnyType>::Ptr _get_informer(const std::string& category);
protected:
OutputBuffer(const std::string& basename);
void _initialize_server();
public:
static boost::shared_ptr<OutputBuffer> create(const std::string& basename);
~OutputBuffer() {
IPAACA_IMPLEMENT_ME
}
void add(boost::shared_ptr<IU> iu);
boost::shared_ptr<IU> remove(const std::string& iu_uid);
boost::shared_ptr<IU> remove(boost::shared_ptr<IU> iu);
boost::shared_ptr<IUInterface> get(const std::string& iu_uid);
std::set<boost::shared_ptr<IUInterface> > get_ius();
typedef boost::shared_ptr<OutputBuffer> ptr;
};
//}}}
class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<InputBuffer> {//{{{
friend class IU;
friend class RemotePushIU;
protected:
std::map<std::string, ListenerPtr> _listener_store;
std::map<std::string, RemoteServerPtr> _remote_server_store;
RemotePushIUStore _iu_store; // TODO genericize
protected:
inline void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef")
{
IPAACA_WARNING("(ERROR) InputBuffer::_send_iu_link_update() should never be invoked")
}
inline void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef")
{
IPAACA_WARNING("(ERROR) InputBuffer::_send_iu_payload_update() should never be invoked")
}
inline void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name="undef")
{
IPAACA_WARNING("(ERROR) InputBuffer::_send_iu_commission() should never be invoked")
}
protected:
RemoteServerPtr _get_remote_server(const std::string& unique_server_name);
ListenerPtr _create_category_listener_if_needed(const std::string& category);
void _handle_iu_events(EventPtr event);
protected:
InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests);
InputBuffer(const std::string& basename, const std::string& category_interest1);
InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2);
InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3);
InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4);
public:
static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::vector<std::string>& category_interests);
static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1);
static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2);
static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3);
static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4);
~InputBuffer() {
IPAACA_IMPLEMENT_ME
}
boost::shared_ptr<IUInterface> get(const std::string& iu_uid);
std::set<boost::shared_ptr<IUInterface> > get_ius();
typedef boost::shared_ptr<InputBuffer> ptr;
};
//}}}
class IUConverter: public rsb::converter::Converter<std::string> {//{{{
public:
IUConverter();
std::string serialize(const rsb::AnnotatedData& data, std::string& wire);
rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire);
};//}}}
class IUPayloadUpdate {//{{{
public:
std::string uid;
revision_t revision;
std::string writer_name;
bool is_delta;
std::map<std::string, std::string> new_items;
std::vector<std::string> keys_to_remove;
friend std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj);
typedef boost::shared_ptr<IUPayloadUpdate> ptr;
};//}}}
class IUPayloadUpdateConverter: public rsb::converter::Converter<std::string> {//{{{
public:
IUPayloadUpdateConverter();
std::string serialize(const rsb::AnnotatedData& data, std::string& wire);
rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire);
};//}}}
class IULinkUpdate {//{{{
public:
std::string uid;
revision_t revision;
std::string writer_name;
bool is_delta;
std::map<std::string, std::set<std::string> > new_links;
std::map<std::string, std::set<std::string> > links_to_remove;
friend std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj);
typedef boost::shared_ptr<IULinkUpdate> ptr;
};//}}}
class IULinkUpdateConverter: public rsb::converter::Converter<std::string> {//{{{
public:
IULinkUpdateConverter();
std::string serialize(const rsb::AnnotatedData& data, std::string& wire);
rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire);
};//}}}
class IntConverter: public rsb::converter::Converter<std::string> {//{{{
public:
IntConverter();
std::string serialize(const rsb::AnnotatedData& data, std::string& wire);
rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire);
};//}}}
class Initializer
{
public:
static void initialize_ipaaca_rsb_if_needed();
static bool initialized();
protected:
static bool _initialized;
};
class PayloadEntryProxy//{{{
{
protected:
Payload* _payload;
std::string _key;
public:
PayloadEntryProxy(Payload* payload, const std::string& key);
PayloadEntryProxy& operator=(const std::string& value);
operator std::string();
operator long();
operator double();
inline std::string to_str() { return operator std::string(); }
inline long to_int() { return operator long(); }
inline double to_float() { return operator double(); }
};//}}}
class Payload//{{{
{
friend std::ostream& operator<<(std::ostream& os, const Payload& obj);
friend class IUInterface;
friend class IU;
friend class RemotePushIU;
friend class IUConverter;
friend class CallbackIUPayloadUpdate;
protected:
std::string _owner_name;
std::map<std::string, std::string> _store;
boost::shared_ptr<IUInterface> _iu;
protected:
void initialize(boost::shared_ptr<IUInterface> iu);
inline void _set_owner_name(const std::string& name) { _owner_name = name; }
void _remotely_enforced_wipe();
void _remotely_enforced_delitem(const std::string& k);
void _remotely_enforced_setitem(const std::string& k, const std::string& v);
void _internal_replace_all(const std::map<std::string, std::string>& new_contents, const std::string& writer_name="");
void _internal_set(const std::string& k, const std::string& v, const std::string& writer_name="");
void _internal_remove(const std::string& k, const std::string& writer_name="");
public:
inline const std::string& owner_name() { return _owner_name; }
// access
PayloadEntryProxy operator[](const std::string& key);
inline void set(const std::string& k, const std::string& v) { _internal_set(k, v); }
inline void remove(const std::string& k) { _internal_remove(k); }
std::string get(const std::string& k);
typedef boost::shared_ptr<Payload> ptr;
};//}}}
class IUInterface {//{{{
friend class IUConverter;
friend std::ostream& operator<<(std::ostream& os, const IUInterface& obj);
protected:
IUInterface();
public:
inline virtual ~IUInterface() { }
protected:
std::string _uid;
revision_t _revision;
std::string _category;
std::string _payload_type; // default is "MAP"
std::string _owner_name;
bool _committed;
IUAccessMode _access_mode;
bool _read_only;
//boost::shared_ptr<Buffer> _buffer;
Buffer* _buffer;
SmartLinkMap _links;
protected:
friend class Payload;
// Internal functions that perform the update logic,
// e.g. sending a notification across the network
_IPAACA_ABSTRACT_ virtual void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) = 0;
_IPAACA_ABSTRACT_ virtual void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) = 0;
//void _set_buffer(boost::shared_ptr<Buffer> buffer);
void _associate_with_buffer(Buffer* buffer);
void _set_buffer(Buffer* buffer);
void _set_uid(const std::string& uid);
void _set_owner_name(const std::string& owner_name);
protected:
// internal functions that do not emit update events
inline void _add_and_remove_links(const LinkMap& add, const LinkMap& remove) { _links._add_and_remove_links(add, remove); }
inline void _replace_links(const LinkMap& links) { _links._replace_links(links); }
public:
inline bool is_published() { return (_buffer != 0); }
inline const std::string& uid() const { return _uid; }
inline revision_t revision() const { return _revision; }
inline const std::string& category() const { return _category; }
inline const std::string& payload_type() const { return _payload_type; }
inline const std::string& owner_name() const { return _owner_name; }
inline bool committed() const { return _committed; }
inline IUAccessMode access_mode() const { return _access_mode; }
inline bool read_only() const { return _read_only; }
//inline boost::shared_ptr<Buffer> buffer() { return _buffer; }
inline Buffer* buffer() const { return _buffer; }
inline const LinkSet& get_links(std::string type) { return _links.get_links(type); }
inline const LinkMap& get_all_links() { return _links.get_all_links(); }
// Payload
_IPAACA_ABSTRACT_ virtual Payload& payload() = 0;
_IPAACA_ABSTRACT_ virtual const Payload& const_payload() const = 0;
// setters
_IPAACA_ABSTRACT_ virtual void commit() = 0;
// functions to modify and update links:
void add_links(const std::string& type, const LinkSet& targets, const std::string& writer_name = "");
void remove_links(const std::string& type, const LinkSet& targets, const std::string& writer_name = "");
void modify_links(const LinkMap& add, const LinkMap& remove, const std::string& writer_name = "");
void set_links(const LinkMap& links, const std::string& writer_name = "");
// (with cpp specific convenience functions:)
void add_link(const std::string& type, const std::string& target, const std::string& writer_name = "");
void remove_link(const std::string& type, const std::string& target, const std::string& writer_name = "");
typedef boost::shared_ptr<IUInterface> ptr;
};//}}}
class IU: public IUInterface {//{{{
friend class Buffer;
friend class InputBuffer;
friend class OutputBuffer;
friend class CallbackIUPayloadUpdate;
friend class CallbackIULinkUpdate;
friend class CallbackIUCommission;
public:
Payload _payload;
protected:
Lock _revision_lock;
protected:
inline void _increase_revision_number() { _revision++; }
IU(const std::string& category, IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" );
public:
inline ~IU() {
IPAACA_IMPLEMENT_ME
}
static boost::shared_ptr<IU> create(const std::string& category, IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" );
inline Payload& payload() { return _payload; }
inline const Payload& const_payload() const { return _payload; }
void commit();
protected:
void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = "");
void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name = "");
protected:
void _internal_commit(const std::string& writer_name = "");
public:
typedef boost::shared_ptr<IU> ptr;
};//}}}
class RemotePushIU: public IUInterface {//{{{
friend class Buffer;
friend class InputBuffer;
friend class OutputBuffer;
friend class IUConverter;
public:
Payload _payload;
protected:
RemotePushIU();
static boost::shared_ptr<RemotePushIU> create();
public:
inline ~RemotePushIU() {
IPAACA_IMPLEMENT_ME
}
inline Payload& payload() { return _payload; }
inline const Payload& const_payload() const { return _payload; }
void commit();
protected:
void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = "");
void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name = "");
protected:
void _apply_update(IUPayloadUpdate::ptr update);
void _apply_link_update(IULinkUpdate::ptr update);
void _apply_commission();
typedef boost::shared_ptr<RemotePushIU> ptr;
};//}}}
class Exception: public std::exception//{{{
{
protected:
std::string _description;
inline Exception(const std::string& description=""): _description(description) { }
public:
inline ~Exception() throw() { }
const char* what() const throw() {
return _description.c_str();
}
};//}}}
class IUNotFoundError: public Exception//{{{
{
public:
inline ~IUNotFoundError() throw() { }
inline IUNotFoundError() { //boost::shared_ptr<IU> iu) {
_description = "IUNotFoundError";
}
};//}}}
class IUPublishedError: public Exception//{{{
{
public:
inline ~IUPublishedError() throw() { }
inline IUPublishedError() { //boost::shared_ptr<IU> iu) {
_description = "IUPublishedError";
}
};//}}}
class IUCommittedError: public Exception//{{{
{
public:
inline ~IUCommittedError() throw() { }
inline IUCommittedError() { //boost::shared_ptr<IU> iu) {
_description = "IUCommittedError";
}
};//}}}
class IUUpdateFailedError: public Exception//{{{
{
public:
inline ~IUUpdateFailedError() throw() { }
inline IUUpdateFailedError() { //boost::shared_ptr<IU> iu) {
_description = "IUUpdateFailedError";
}
};//}}}
class IUReadOnlyError: public Exception//{{{
{
public:
inline ~IUReadOnlyError() throw() { }
inline IUReadOnlyError() { //boost::shared_ptr<IU> iu) {
_description = "IUReadOnlyError";
}
};//}}}
class IUAlreadyInABufferError: public Exception//{{{
{
public:
inline ~IUAlreadyInABufferError() throw() { }
inline IUAlreadyInABufferError() { //boost::shared_ptr<IU> iu) {
_description = "IUAlreadyInABufferError";
}
};//}}}
class IUAlreadyHasAnUIDError: public Exception//{{{
{
public:
inline ~IUAlreadyHasAnUIDError() throw() { }
inline IUAlreadyHasAnUIDError() { //boost::shared_ptr<IU> iu) {
_description = "IUAlreadyHasAnUIDError";
}
};//}}}
class IUAlreadyHasAnOwnerNameError: public Exception//{{{
{
public:
inline ~IUAlreadyHasAnOwnerNameError() throw() { }
inline IUAlreadyHasAnOwnerNameError() { //boost::shared_ptr<IU> iu) {
_description = "IUAlreadyHasAnOwnerNameError";
}
};//}}}
class NotImplementedError: public Exception//{{{
{
public:
inline ~NotImplementedError() throw() { }
inline NotImplementedError() { //boost::shared_ptr<IU> iu) {
_description = "NotImplementedError";
}
};//}}}
// (snippets) //{{{
/*
class IUEventFunctionHandler: public rsb::EventFunctionHandler {
protected:
Buffer* _buffer;
public:
inline IUEventFunctionHandler(Buffer* buffer, const EventFunction& function, const std::string& method="")
: EventFunctionHandler(function, method), _buffer(buffer) { }
};
*/
//}}}
} // of namespace ipaaca
#endif
ifeq ($(WBS_ARCH),mac)
LIB_SUFFIX=.dylib
else
LIB_SUFFIX=.so
endif
CONFIG = -DIPAACA_DEBUG_MESSAGES
IPAACASOURCES = ipaaca.cc ipaaca.pb.cc
SOURCES = ${IPAACASOURCES} ipaaca-test-main.cc
TEXTSOURCES = ${IPAACASOURCES} textsender.cc
CCFLAGS=-I. -I/usr/local/include -I/opt/local/include ${CONFIG}
LIBFLAGS=-fPIC -shared
BOOSTLIBS = -L/opt/local/lib -lboost_regex-mt -lboost_date_time-mt -lboost_program_options-mt -lboost_thread-mt -lboost_filesystem-mt -lboost_signals-mt -lboost_system-mt
PROTOLIBS = -L/opt/local/lib -lprotobuf
LIBS = ${BOOSTLIBS} ${PROTOLIBS} -L/usr/local/lib -lrsc -lrsbcore
COMPILER = gfilt
all: lib
lib:
${COMPILER} ${CCFLAGS} ${IPAACASOURCES} ${LIBS} ${LIBFLAGS} -o libipaaca${LIB_SUFFIX}
receiver:
${COMPILER} ${CCFLAGS} -DMAKE_RECEIVER -o ipaaca-receiver ${SOURCES} ${LIBS}
sender:
${COMPILER} ${CCFLAGS} -DMAKE_SENDER -o ipaaca-sender ${SOURCES} ${LIBS}
main:
${COMPILER} ${CCFLAGS} -o ipaaca-main ${SOURCES} ${LIBS}
protoc:
protoc --proto_path=../../proto ../../proto/ipaaca.proto --cpp_out=.
clean:
rm -f libipaaca${LIB_SUFFIX} ipaaca-main ipaaca-sender ipaaca-receiver ipaaca.pb.h ipaaca.pb.cc
#include <ipaaca.h>
#include <cstdlib>
namespace ipaaca {
// util and init//{{{
bool Initializer::_initialized = false;
//const LinkSet EMPTY_LINK_SET = LinkSet();
//const std::set<std::string> EMPTY_LINK_SET();
bool Initializer::initialized() { return _initialized; }
void Initializer::initialize_ipaaca_rsb_if_needed()
{
if (_initialized) return;
ParticipantConfig config = ParticipantConfig::fromConfiguration();
Factory::getInstance().setDefaultParticipantConfig(config);
boost::shared_ptr<IUConverter> iu_converter(new IUConverter());
stringConverterRepository()->registerConverter(iu_converter);
boost::shared_ptr<IUPayloadUpdateConverter> payload_update_converter(new IUPayloadUpdateConverter());
stringConverterRepository()->registerConverter(payload_update_converter);
boost::shared_ptr<IULinkUpdateConverter> link_update_converter(new IULinkUpdateConverter());
stringConverterRepository()->registerConverter(link_update_converter);
boost::shared_ptr<ProtocolBufferConverter<protobuf::IUCommission> > iu_commission_converter(new ProtocolBufferConverter<protobuf::IUCommission> ());
stringConverterRepository()->registerConverter(iu_commission_converter);
boost::shared_ptr<IntConverter> int_converter(new IntConverter());
stringConverterRepository()->registerConverter(int_converter);
_initialized = true;
//IPAACA_TODO("initialize all converters")
}
std::string generate_uuid_string()
{
uuid_t uuidt;
uuid_string_t uuidstr;
uuid_generate(uuidt);
uuid_unparse_lower(uuidt, uuidstr);
return uuidstr;
}
/*
void init_inprocess_too() {
//ParticipantConfig config = Factory::getInstance().getDefaultParticipantConfig();
ParticipantConfig config = ParticipantConfig::fromFile("rsb.cfg");
//ParticipantConfig::Transport inprocess = config.getTransport("inprocess");
//inprocess.setEnabled(true);
//config.addTransport(inprocess);
Factory::getInstance().setDefaultParticipantConfig(config);
}
*/
//}}}
std::ostream& operator<<(std::ostream& os, const SmartLinkMap& obj)//{{{
{
os << "{";
bool first = true;
for (LinkMap::const_iterator it=obj._links.begin(); it!=obj._links.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << it->first << "': [";
bool firstinner = true;
for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
if (firstinner) { firstinner=false; } else { os << ", "; }
os << "'" << *it2 << "'";
}
os << "]";
}
os << "}";
return os;
}
//}}}
std::ostream& operator<<(std::ostream& os, const Payload& obj)//{{{
{
os << "{";
bool first = true;
for (std::map<std::string, std::string>::const_iterator it=obj._store.begin(); it!=obj._store.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << it->first << "':'" << it->second << "'";
}
os << "}";
return os;
}
//}}}
std::ostream& operator<<(std::ostream& os, const IUInterface& obj)//{{{
{
os << "IUInterface(uid='" << obj.uid() << "'";
os << ", category='" << obj.category() << "'";
os << ", revision=" << obj.revision();
os << ", committed=" << (obj.committed()?"True":"False");
os << ", owner_name='" << obj.owner_name() << "'";
os << ", payload=";
os << obj.const_payload();
os << ", links=";
os << obj._links;
os << ")";
return os;
}
//}}}
std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj)//{{{
{
os << "PayloadUpdate(uid=" << obj.uid << ", revision=" << obj.revision;
os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False");
os << ", new_items = {";
bool first = true;
for (std::map<std::string, std::string>::const_iterator it=obj.new_items.begin(); it!=obj.new_items.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << it->first << "':'" << it->second << "'";
}
os << "}, keys_to_remove = [";
first = true;
for (std::vector<std::string>::const_iterator it=obj.keys_to_remove.begin(); it!=obj.keys_to_remove.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << *it << "'";
}
os << "])";
return os;
}
//}}}
std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj)//{{{
{
os << "LinkUpdate(uid=" << obj.uid << ", revision=" << obj.revision;
os << ", writer_name=" << obj.writer_name << ", is_delta=" << (obj.is_delta?"True":"False");
os << ", new_links = {";
bool first = true;
for (std::map<std::string, std::set<std::string> >::const_iterator it=obj.new_links.begin(); it!=obj.new_links.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << it->first << "': [";
bool ffirst = true;
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
if (ffirst) { ffirst=false; } else { os << ", "; }
os << "'" << *it2 << "'";
}
os << "]";
}
os << "}, links_to_remove = {";
first = true;
for (std::map<std::string, std::set<std::string> >::const_iterator it=obj.links_to_remove.begin(); it!=obj.links_to_remove.end(); ++it) {
if (first) { first=false; } else { os << ", "; }
os << "'" << it->first << "': [";
bool ffirst = true;
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
if (ffirst) { ffirst=false; } else { os << ", "; }
os << "'" << *it2 << "'";
}
os << "]";
}
os << "})";
return os;
}
//}}}
// SmartLinkMap//{{{
LinkSet SmartLinkMap::empty_link_set;
void SmartLinkMap::_add_and_remove_links(const LinkMap& add, const LinkMap& remove)
{
// remove specified links
for (LinkMap::const_iterator it = remove.begin(); it != remove.end(); ++it ) {
// if link type exists
if (_links.count(it->first) > 0) {
// remove one by one
for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
_links[it->first].erase(*it2);
}
// wipe the type key if no more links are left
if (_links[it->first].size() == 0) {
_links.erase(it->first);
}
}
}
// add specified links
for (LinkMap::const_iterator it = add.begin(); it != add.end(); ++it ) {
for (LinkSet::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
_links[it->first].insert(*it2);
}
}
}
void SmartLinkMap::_replace_links(const LinkMap& links)
{
//_links.clear();
_links=links;
}
const LinkSet& SmartLinkMap::get_links(const std::string& key)
{
LinkMap::const_iterator it = _links.find(key);
if (it==_links.end()) return empty_link_set;
return it->second;
}
const LinkMap& SmartLinkMap::get_all_links()
{
return _links;
}
//}}}
// IUEventHandler//{{{
IUEventHandler::IUEventHandler(IUEventHandlerFunction function, IUEventType event_mask, const std::string& category)
: _function(function), _event_mask(event_mask), _for_all_categories(false)
{
if (category=="") {
_for_all_categories = true;
} else {
_categories.insert(category);
}
}
IUEventHandler::IUEventHandler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories)
: _function(function), _event_mask(event_mask), _for_all_categories(false)
{
if (categories.size()==0) {
_for_all_categories = true;
} else {
_categories = categories;
}
}
void IUEventHandler::call(Buffer* buffer, boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category)
{
if (_condition_met(event_type, category)) {
//IUInterface::ptr iu = buffer->get(uid);
//if (iu) {
_function(iu, event_type, local);
//}
}
}
//}}}
// Buffer//{{{
void Buffer::_allocate_unique_name(const std::string& basename, const std::string& function) {
std::string uuid = ipaaca::generate_uuid_string();
_basename = basename;
_uuid = uuid.substr(0,8);
_unique_name = "/ipaaca/component/" + _basename + "ID" + _uuid + "/" + function;
}
void Buffer::register_handler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories)
{
IUEventHandler::ptr handler = IUEventHandler::ptr(new IUEventHandler(function, event_mask, categories));
_event_handlers.push_back(handler);
}
void Buffer::register_handler(IUEventHandlerFunction function, IUEventType event_mask, const std::string& category)
{
IUEventHandler::ptr handler = IUEventHandler::ptr(new IUEventHandler(function, event_mask, category));
_event_handlers.push_back(handler);
}
void Buffer::call_iu_event_handlers(boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category)
{
//IPAACA_INFO("handling an event " << ipaaca::iu_event_type_to_str(event_type) << " for IU " << iu->uid())
for (std::vector<IUEventHandler::ptr>::iterator it = _event_handlers.begin(); it != _event_handlers.end(); ++it) {
(*it)->call(this, iu, local, event_type, category);
}
}
//}}}
// Callbacks for OutputBuffer//{{{
CallbackIUPayloadUpdate::CallbackIUPayloadUpdate(Buffer* buffer): _buffer(buffer) { }
CallbackIULinkUpdate::CallbackIULinkUpdate(Buffer* buffer): _buffer(buffer) { }
CallbackIUCommission::CallbackIUCommission(Buffer* buffer): _buffer(buffer) { }
boost::shared_ptr<int> CallbackIUPayloadUpdate::call(const std::string& methodName, boost::shared_ptr<IUPayloadUpdate> update)
{
IUInterface::ptr iui = _buffer->get(update->uid);
if (! iui) {
IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid)
return boost::shared_ptr<int>(new int(0));
}
IU::ptr iu = boost::static_pointer_cast<IU>(iui);
iu->_revision_lock.lock();
if ((update->revision != 0) && (update->revision != iu->_revision)) {
IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid)
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(0));
}
if (update->is_delta) {
for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) {
iu->payload()._internal_remove(*it, update->writer_name); //_buffer->unique_name());
}
for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
iu->payload()._internal_set(it->first, it->second, update->writer_name); //_buffer->unique_name());
}
} else {
iu->payload()._internal_replace_all(update->new_items, update->writer_name); //_buffer->unique_name());
}
_buffer->call_iu_event_handlers(iu, true, IU_UPDATED, iu->category());
revision_t revision = iu->revision();
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(revision));
}
boost::shared_ptr<int> CallbackIULinkUpdate::call(const std::string& methodName, boost::shared_ptr<IULinkUpdate> update)
{
IUInterface::ptr iui = _buffer->get(update->uid);
if (! iui) {
IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid)
return boost::shared_ptr<int>(new int(0));
}
IU::ptr iu = boost::static_pointer_cast<IU>(iui);
iu->_revision_lock.lock();
if ((update->revision != 0) && (update->revision != iu->_revision)) {
IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid)
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(0));
}
if (update->is_delta) {
iu->modify_links(update->new_links, update->links_to_remove, update->writer_name);
} else {
iu->set_links(update->new_links, update->writer_name);
}
_buffer->call_iu_event_handlers(iu, true, IU_LINKSUPDATED, iu->category());
revision_t revision = iu->revision();
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(revision));
}
boost::shared_ptr<int> CallbackIUCommission::call(const std::string& methodName, boost::shared_ptr<protobuf::IUCommission> update)
{
IUInterface::ptr iui = _buffer->get(update->uid());
if (! iui) {
IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid())
return boost::shared_ptr<int>(new int(0));
}
IU::ptr iu = boost::static_pointer_cast<IU>(iui);
iu->_revision_lock.lock();
if ((update->revision() != 0) && (update->revision() != iu->_revision)) {
IPAACA_INFO("Remote write operation failed because request was out of date; IU " << update->uid())
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(0));
}
if (iu->committed()) {
return boost::shared_ptr<int>(new int(0));
} else {
}
iu->_internal_commit(update->writer_name());
_buffer->call_iu_event_handlers(iu, true, IU_LINKSUPDATED, iu->category());
revision_t revision = iu->revision();
iu->_revision_lock.unlock();
return boost::shared_ptr<int>(new int(revision));
}
//}}}
// OutputBuffer//{{{
OutputBuffer::OutputBuffer(const std::string& basename)
:Buffer(basename, "OB")
{
_id_prefix = _basename + "-" + _uuid + "-IU-";
_initialize_server();
}
void OutputBuffer::_initialize_server()
{
_server = Factory::getInstance().createServer( Scope( _unique_name ) );
_server->registerMethod("updatePayload", Server::CallbackPtr(new CallbackIUPayloadUpdate(this)));
_server->registerMethod("updateLinks", Server::CallbackPtr(new CallbackIULinkUpdate(this)));
_server->registerMethod("commit", Server::CallbackPtr(new CallbackIUCommission(this)));
}
OutputBuffer::ptr OutputBuffer::create(const std::string& basename)
{
Initializer::initialize_ipaaca_rsb_if_needed();
return OutputBuffer::ptr(new OutputBuffer(basename));
}
IUInterface::ptr OutputBuffer::get(const std::string& iu_uid)
{
IUStore::iterator it = _iu_store.find(iu_uid);
if (it==_iu_store.end()) return IUInterface::ptr();
return it->second;
}
std::set<IUInterface::ptr> OutputBuffer::get_ius()
{
std::set<IUInterface::ptr> set;
for (IUStore::iterator it=_iu_store.begin(); it!=_iu_store.end(); ++it) set.insert(it->second);
return set;
}
void OutputBuffer::_send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
IULinkUpdate* lup = new ipaaca::IULinkUpdate();
Informer<ipaaca::IULinkUpdate>::DataPtr ldata(lup);
lup->uid = iu->uid();
lup->is_delta = is_delta;
lup->revision = revision;
lup->is_delta = true;
lup->new_links = new_links;
if (is_delta) lup->links_to_remove = links_to_remove;
if (writer_name=="") lup->writer_name = _unique_name;
else lup->writer_name = writer_name;
Informer<AnyType>::Ptr informer = _get_informer(iu->category());
informer->publish(ldata);
}
void OutputBuffer::_send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
{
IUPayloadUpdate* pup = new ipaaca::IUPayloadUpdate();
Informer<ipaaca::IUPayloadUpdate>::DataPtr pdata(pup);
pup->uid = iu->uid();
pup->is_delta = is_delta;
pup->revision = revision;
pup->new_items = new_items;
if (is_delta) pup->keys_to_remove = keys_to_remove;
if (writer_name=="") pup->writer_name = _unique_name;
else pup->writer_name = writer_name;
Informer<AnyType>::Ptr informer = _get_informer(iu->category());
informer->publish(pdata);
}
void OutputBuffer::_send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name)
{
Informer<protobuf::IUCommission>::DataPtr data(new protobuf::IUCommission());
data->set_uid(iu->uid());
data->set_revision(revision);
if (writer_name=="") data->set_writer_name(_unique_name);
else data->set_writer_name(writer_name);
Informer<AnyType>::Ptr informer = _get_informer(iu->category());
informer->publish(data);
}
void OutputBuffer::add(IU::ptr iu)
{
if (_iu_store.count(iu->uid()) > 0) {
throw IUPublishedError();
}
_iu_store[iu->uid()] = iu;
iu->_associate_with_buffer(this); //shared_from_this());
_publish_iu(iu);
}
void OutputBuffer::_publish_iu(IU::ptr iu)
{
Informer<AnyType>::Ptr informer = _get_informer(iu->_category);
Informer<ipaaca::IU>::DataPtr iu_data(iu);
informer->publish(iu_data);
}
Informer<AnyType>::Ptr OutputBuffer::_get_informer(const std::string& category)
{
if (_informer_store.count(category) > 0) {
return _informer_store[category];
} else {
//IPAACA_INFO("Making new informer for category " << category)
std::string scope_string = "/ipaaca/category/" + category;
Informer<AnyType>::Ptr informer = Factory::getInstance().createInformer<AnyType> ( Scope(scope_string));
_informer_store[category] = informer;
return informer;
}
}
boost::shared_ptr<IU> OutputBuffer::remove(const std::string& iu_uid)
{
IUStore::iterator it = _iu_store.find(iu_uid);
if (it == _iu_store.end()) throw IUNotFoundError();
IU::ptr iu = it->second;
_retract_iu(iu);
_iu_store.erase(iu_uid);
return iu;
}
boost::shared_ptr<IU> OutputBuffer::remove(IU::ptr iu)
{
return remove(iu->uid()); // to make sure it is in the store
}
void OutputBuffer::_retract_iu(IU::ptr iu)
{
Informer<protobuf::IURetraction>::DataPtr data(new protobuf::IURetraction());
data->set_uid(iu->uid());
data->set_revision(iu->revision());
Informer<AnyType>::Ptr informer = _get_informer(iu->category());
informer->publish(data);
}
//}}}
// InputBuffer//{{{
InputBuffer::InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests)
:Buffer(basename, "IB")
{
for (std::vector<std::string>::const_iterator it=category_interests.begin(); it!=category_interests.end(); ++it) {
_create_category_listener_if_needed(*it);
}
}
InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1)
:Buffer(basename, "IB")
{
_create_category_listener_if_needed(category_interest1);
}
InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2)
:Buffer(basename, "IB")
{
_create_category_listener_if_needed(category_interest1);
_create_category_listener_if_needed(category_interest2);
}
InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3)
:Buffer(basename, "IB")
{
_create_category_listener_if_needed(category_interest1);
_create_category_listener_if_needed(category_interest2);
_create_category_listener_if_needed(category_interest3);
}
InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4)
:Buffer(basename, "IB")
{
_create_category_listener_if_needed(category_interest1);
_create_category_listener_if_needed(category_interest2);
_create_category_listener_if_needed(category_interest3);
_create_category_listener_if_needed(category_interest4);
}
InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::vector<std::string>& category_interests)
{
Initializer::initialize_ipaaca_rsb_if_needed();
return InputBuffer::ptr(new InputBuffer(basename, category_interests));
}
InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1)
{
Initializer::initialize_ipaaca_rsb_if_needed();
return InputBuffer::ptr(new InputBuffer(basename, category_interest1));
}
InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2)
{
Initializer::initialize_ipaaca_rsb_if_needed();
return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2));
}
InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3)
{
Initializer::initialize_ipaaca_rsb_if_needed();
return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3));
}
InputBuffer::ptr InputBuffer::create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4)
{
Initializer::initialize_ipaaca_rsb_if_needed();
return InputBuffer::ptr(new InputBuffer(basename, category_interest1, category_interest2, category_interest3, category_interest4));
}
IUInterface::ptr InputBuffer::get(const std::string& iu_uid)
{
RemotePushIUStore::iterator it = _iu_store.find(iu_uid); // TODO genericize
if (it==_iu_store.end()) return IUInterface::ptr();
return it->second;
}
std::set<IUInterface::ptr> InputBuffer::get_ius()
{
std::set<IUInterface::ptr> set;
for (RemotePushIUStore::iterator it=_iu_store.begin(); it!=_iu_store.end(); ++it) set.insert(it->second); // TODO genericize
return set;
}
RemoteServerPtr InputBuffer::_get_remote_server(const std::string& unique_server_name)
{
std::map<std::string, RemoteServerPtr>::iterator it = _remote_server_store.find(unique_server_name);
if (it!=_remote_server_store.end()) return it->second;
RemoteServerPtr remote_server = Factory::getInstance().createRemoteServer(Scope(unique_server_name));
_remote_server_store[unique_server_name] = remote_server;
return remote_server;
}
ListenerPtr InputBuffer::_create_category_listener_if_needed(const std::string& category)
{
std::map<std::string, ListenerPtr>::iterator it = _listener_store.find(category);
if (it!=_listener_store.end()) return it->second;
//IPAACA_INFO("Creating a new listener for category " << category)
std::string scope_string = "/ipaaca/category/" + category;
ListenerPtr listener = Factory::getInstance().createListener( Scope(scope_string) );
HandlerPtr event_handler = HandlerPtr(
new EventFunctionHandler(
boost::bind(&InputBuffer::_handle_iu_events, this, _1)
)
);
listener->addHandler(event_handler);
_listener_store[category] = listener;
return listener;
/*
'''Return (or create, store and return) a category listener.'''
if iu_category in self._listener_store: return self._informer_store[iu_category]
cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
cat_listener.addHandler(self._handle_iu_events)
self._listener_store[iu_category] = cat_listener
self._category_interests.append(iu_category)
logger.info("Added listener in scope "+"/ipaaca/category/"+iu_category)
return cat_listener
*/
}
void InputBuffer::_handle_iu_events(EventPtr event)
{
std::string type = event->getType();
if (type == "ipaaca::RemotePushIU") {
boost::shared_ptr<RemotePushIU> iu = boost::static_pointer_cast<RemotePushIU>(event->getData());
if (_iu_store.count(iu->category()) > 0) {
// already got the IU... ignore
} else {
_iu_store[iu->uid()] = iu;
iu->_set_buffer(this);
call_iu_event_handlers(iu, false, IU_ADDED, iu->category() );
}
//IPAACA_INFO( "New RemotePushIU state: " << (*iu) )
} else {
RemotePushIUStore::iterator it;
if (type == "ipaaca::IUPayloadUpdate") {
boost::shared_ptr<IUPayloadUpdate> update = boost::static_pointer_cast<IUPayloadUpdate>(event->getData());
//IPAACA_INFO("** writer name: " << update->writer_name)
if (update->writer_name == _unique_name) {
return;
}
it = _iu_store.find(update->uid);
if (it == _iu_store.end()) {
IPAACA_INFO("Ignoring UPDATED message for an IU that we did not fully receive before")
return;
}
//
it->second->_apply_update(update);
call_iu_event_handlers(it->second, false, IU_UPDATED, it->second->category() );
//
//
} else if (type == "ipaaca::IULinkUpdate") {
boost::shared_ptr<IULinkUpdate> update = boost::static_pointer_cast<IULinkUpdate>(event->getData());
if (update->writer_name == _unique_name) {
return;
}
it = _iu_store.find(update->uid);
if (it == _iu_store.end()) {
IPAACA_INFO("Ignoring LINKSUPDATED message for an IU that we did not fully receive before")
return;
}
//
it->second->_apply_link_update(update);
call_iu_event_handlers(it->second, false, IU_LINKSUPDATED, it->second->category() );
//
//
} else if (type == "ipaaca::protobuf::IUCommission") {
boost::shared_ptr<protobuf::IUCommission> update = boost::static_pointer_cast<protobuf::IUCommission>(event->getData());
if (update->writer_name() == _unique_name) {
return;
}
it = _iu_store.find(update->uid());
if (it == _iu_store.end()) {
IPAACA_INFO("Ignoring COMMITTED message for an IU that we did not fully receive before")
return;
}
//
it->second->_apply_commission();
it->second->_revision = update->revision();
call_iu_event_handlers(it->second, false, IU_COMMITTED, it->second->category() );
//
//
} else {
std::cout << "(Unhandled Event type " << type << " !)" << std::endl;
return;
}
//IPAACA_INFO( "New RemotePushIU state: " << *(it->second) )
}
}
//}}}
// IUInterface//{{{
IUInterface::IUInterface()
: _buffer(NULL), _committed(false)
{
}
void IUInterface::_set_uid(const std::string& uid) {
if (_uid != "") {
throw IUAlreadyHasAnUIDError();
}
_uid = uid;
}
void IUInterface::_set_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) {
if (_buffer) {
throw IUAlreadyInABufferError();
}
_buffer = buffer;
}
void IUInterface::_set_owner_name(const std::string& owner_name) {
if (_owner_name != "") {
throw IUAlreadyHasAnOwnerNameError();
}
_owner_name = owner_name;
}
/// set the buffer pointer and the owner names of IU and Payload
void IUInterface::_associate_with_buffer(Buffer* buffer) { //boost::shared_ptr<Buffer> buffer) {
_set_buffer(buffer); // will throw if already set
_set_owner_name(buffer->unique_name());
payload()._set_owner_name(buffer->unique_name());
}
/// C++-specific convenience function to add one single link
void IUInterface::add_link(const std::string& type, const std::string& target, const std::string& writer_name)
{
LinkMap none;
LinkMap add;
add[type].insert(target);
_modify_links(true, add, none, writer_name);
_add_and_remove_links(add, none);
}
/// C++-specific convenience function to remove one single link
void IUInterface::remove_link(const std::string& type, const std::string& target, const std::string& writer_name)
{
LinkMap none;
LinkMap remove;
remove[type].insert(target);
_modify_links(true, none, remove, writer_name);
_add_and_remove_links(none, remove);
}
void IUInterface::add_links(const std::string& type, const LinkSet& targets, const std::string& writer_name)
{
LinkMap none;
LinkMap add;
add[type] = targets;
_modify_links(true, add, none, writer_name);
_add_and_remove_links(add, none);
}
void IUInterface::remove_links(const std::string& type, const LinkSet& targets, const std::string& writer_name)
{
LinkMap none;
LinkMap remove;
remove[type] = targets;
_modify_links(true, none, remove, writer_name);
_add_and_remove_links(none, remove);
}
void IUInterface::modify_links(const LinkMap& add, const LinkMap& remove, const std::string& writer_name)
{
_modify_links(true, add, remove, writer_name);
_add_and_remove_links(add, remove);
}
void IUInterface::set_links(const LinkMap& links, const std::string& writer_name)
{
LinkMap none;
_modify_links(false, links, none, writer_name);
_replace_links(links);
}
//}}}
// IU//{{{
IU::ptr IU::create(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
{
IU::ptr iu = IU::ptr(new IU(category, access_mode, read_only, payload_type)); /* params */ //));
iu->_payload.initialize(iu);
return iu;
}
IU::IU(const std::string& category, IUAccessMode access_mode, bool read_only, const std::string& payload_type)
{
_revision = 1;
_uid = ipaaca::generate_uuid_string();
_category = category;
_payload_type = payload_type;
// payload initialization deferred to IU::create(), above
_read_only = read_only;
_access_mode = access_mode;
_committed = false;
}
void IU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
_revision_lock.lock();
if (_committed) {
_revision_lock.unlock();
throw IUCommittedError();
}
_increase_revision_number();
if (is_published()) {
_buffer->_send_iu_link_update(this, is_delta, _revision, new_links, links_to_remove, writer_name);
}
_revision_lock.unlock();
}
void IU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
{
_revision_lock.lock();
if (_committed) {
_revision_lock.unlock();
throw IUCommittedError();
}
_increase_revision_number();
if (is_published()) {
_buffer->_send_iu_payload_update(this, is_delta, _revision, new_items, keys_to_remove, writer_name);
}
_revision_lock.unlock();
}
void IU::commit()
{
_internal_commit();
}
void IU::_internal_commit(const std::string& writer_name)
{
_revision_lock.lock();
if (_committed) {
_revision_lock.unlock();
throw IUCommittedError();
}
_increase_revision_number();
_committed = true;
if (is_published()) {
_buffer->_send_iu_commission(this, _revision, writer_name);
}
_revision_lock.unlock();
}
//}}}
// RemotePushIU//{{{
RemotePushIU::ptr RemotePushIU::create()
{
RemotePushIU::ptr iu = RemotePushIU::ptr(new RemotePushIU(/* params */));
iu->_payload.initialize(iu);
return iu;
}
RemotePushIU::RemotePushIU()
{
// nothing
}
void RemotePushIU::_modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name)
{
if (_committed) {
throw IUCommittedError();
}
if (_read_only) {
throw IUReadOnlyError();
}
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
IULinkUpdate::ptr update = IULinkUpdate::ptr(new IULinkUpdate());
update->uid = _uid;
update->revision = _revision;
update->is_delta = is_delta;
update->writer_name = _buffer->unique_name();
update->new_links = new_links;
update->links_to_remove = links_to_remove;
boost::shared_ptr<int> result = server->call<int>("updateLinks", update, 1); // TODO 1 sec
if (*result == 0) {
throw IUUpdateFailedError();
} else {
_revision = *result;
}
}
void RemotePushIU::_modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name)
{
if (_committed) {
throw IUCommittedError();
}
if (_read_only) {
throw IUReadOnlyError();
}
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
IUPayloadUpdate::ptr update = IUPayloadUpdate::ptr(new IUPayloadUpdate());
update->uid = _uid;
update->revision = _revision;
update->is_delta = is_delta;
update->writer_name = _buffer->unique_name();
update->new_items = new_items;
update->keys_to_remove = keys_to_remove;
boost::shared_ptr<int> result = server->call<int>("updatePayload", update, 1); // TODO 1 sec
if (*result == 0) {
throw IUUpdateFailedError();
} else {
_revision = *result;
}
}
void RemotePushIU::commit()
{
if (_read_only) {
throw IUReadOnlyError();
}
if (_committed) {
// Following python version: ignoring multiple commit
return;
}
RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name);
boost::shared_ptr<protobuf::IUCommission> update = boost::shared_ptr<protobuf::IUCommission>(new protobuf::IUCommission());
update->set_uid(_uid);
update->set_revision(_revision);
update->set_writer_name(_buffer->unique_name());
boost::shared_ptr<int> result = server->call<int>("commit", update, 1); // TODO 1 sec
if (*result == 0) {
throw IUUpdateFailedError();
} else {
_revision = *result;
}
}
void RemotePushIU::_apply_link_update(IULinkUpdate::ptr update)
{
_revision = update->revision;
if (update->is_delta) {
_add_and_remove_links(update->new_links, update->links_to_remove);
} else {
_replace_links(update->new_links);
}
}
void RemotePushIU::_apply_update(IUPayloadUpdate::ptr update)
{
_revision = update->revision;
if (update->is_delta) {
for (std::vector<std::string>::const_iterator it=update->keys_to_remove.begin(); it!=update->keys_to_remove.end(); ++it) {
_payload._remotely_enforced_delitem(*it);
}
for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
_payload._remotely_enforced_setitem(it->first, it->second);
}
} else {
_payload._remotely_enforced_wipe();
for (std::map<std::string, std::string>::const_iterator it=update->new_items.begin(); it!=update->new_items.end(); ++it) {
_payload._remotely_enforced_setitem(it->first, it->second);
}
}
}
void RemotePushIU::_apply_commission()
{
_committed = true;
}
void Payload::_remotely_enforced_wipe()
{
_store.clear();
}
void Payload::_remotely_enforced_delitem(const std::string& k)
{
_store.erase(k);
}
void Payload::_remotely_enforced_setitem(const std::string& k, const std::string& v)
{
_store[k] = v;
}
//}}}
// PayloadEntryProxy//{{{
PayloadEntryProxy::PayloadEntryProxy(Payload* payload, const std::string& key)
: _payload(payload), _key(key)
{
}
PayloadEntryProxy& PayloadEntryProxy::operator=(const std::string& value)
{
_payload->set(_key, value);
return *this;
}
PayloadEntryProxy::operator std::string()
{
return _payload->get(_key);
}
PayloadEntryProxy::operator long()
{
return atol(operator std::string().c_str());
}
PayloadEntryProxy::operator double()
{
return atof(operator std::string().c_str());
}
//}}}
// Payload//{{{
void Payload::initialize(boost::shared_ptr<IUInterface> iu)
{
_iu = iu;
}
PayloadEntryProxy Payload::operator[](const std::string& key)
{
//boost::shared_ptr<PayloadEntryProxy> p(new PayloadEntryProxy(this, key));
return PayloadEntryProxy(this, key);
}
inline void Payload::_internal_set(const std::string& k, const std::string& v, const std::string& writer_name) {
std::map<std::string, std::string> _new;
std::vector<std::string> _remove;
_new[k]=v;
_iu->_modify_payload(true, _new, _remove, writer_name );
_store[k] = v;
}
inline void Payload::_internal_remove(const std::string& k, const std::string& writer_name) {
std::map<std::string, std::string> _new;
std::vector<std::string> _remove;
_remove.push_back(k);
_iu->_modify_payload(true, _new, _remove, writer_name );
_store.erase(k);
}
void Payload::_internal_replace_all(const std::map<std::string, std::string>& new_contents, const std::string& writer_name)
{
std::vector<std::string> _remove;
_iu->_modify_payload(false, new_contents, _remove, writer_name );
_store = new_contents;
}
inline std::string Payload::get(const std::string& k) {
if (_store.count(k)>0) return _store[k];
else return IPAACA_PAYLOAD_DEFAULT_STRING_VALUE;
}
//}}}
// IUConverter//{{{
IUConverter::IUConverter()
: Converter<std::string> ("ipaaca::IU", "ipaaca-iu", true)
{
}
std::string IUConverter::serialize(const AnnotatedData& data, std::string& wire)
{
// Ensure that DATA actually holds a datum of the data-type we expect.
assert(data.first == getDataType()); // "ipaaca::IU"
// NOTE: a dynamic_pointer_cast cannot be used from void*
boost::shared_ptr<const IU> obj = boost::static_pointer_cast<const IU> (data.second);
boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
// transfer obj data to pbo
pbo->set_uid(obj->uid());
pbo->set_revision(obj->revision());
pbo->set_category(obj->category());
pbo->set_payload_type(obj->payload_type());
pbo->set_owner_name(obj->owner_name());
pbo->set_committed(obj->committed());
pbo->set_access_mode(ipaaca::protobuf::IU::PUSH); // TODO
pbo->set_read_only(obj->read_only());
for (std::map<std::string, std::string>::const_iterator it=obj->_payload._store.begin(); it!=obj->_payload._store.end(); ++it) {
protobuf::PayloadItem* item = pbo->add_payload();
item->set_key(it->first);
item->set_value(it->second);
item->set_type("str"); // FIXME other types than str (later)
}
for (LinkMap::const_iterator it=obj->_links._links.begin(); it!=obj->_links._links.end(); ++it) {
protobuf::LinkSet* links = pbo->add_links();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
pbo->SerializeToString(&wire);
return getWireSchema();
}
AnnotatedData IUConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "ipaaca-iu"
boost::shared_ptr<protobuf::IU> pbo(new protobuf::IU());
pbo->ParseFromString(wire);
IUAccessMode mode = static_cast<IUAccessMode>(pbo->access_mode());
switch(mode) {
case IU_ACCESS_PUSH:
{
// Create a "remote push IU"
boost::shared_ptr<RemotePushIU> obj = RemotePushIU::create();
// transfer pbo data to obj
obj->_uid = pbo->uid();
obj->_revision = pbo->revision();
obj->_category = pbo->category();
obj->_payload_type = pbo->payload_type();
obj->_owner_name = pbo->owner_name();
obj->_committed = pbo->committed();
obj->_read_only = pbo->read_only();
obj->_access_mode = IU_ACCESS_PUSH;
for (int i=0; i<pbo->payload_size(); i++) {
const protobuf::PayloadItem& it = pbo->payload(i);
obj->_payload._store[it.key()] = it.value();
}
for (int i=0; i<pbo->links_size(); i++) {
const protobuf::LinkSet& pls = pbo->links(i);
LinkSet& ls = obj->_links._links[pls.type()];
for (int j=0; j<pls.targets_size(); j++) {
ls.insert(pls.targets(j));
}
}
//return std::make_pair(getDataType(), obj);
return std::make_pair("ipaaca::RemotePushIU", obj);
break;
}
default:
// other cases not handled yet! ( TODO )
throw NotImplementedError();
}
}
//}}}
// IUPayloadUpdateConverter//{{{
IUPayloadUpdateConverter::IUPayloadUpdateConverter()
: Converter<std::string> ("ipaaca::IUPayloadUpdate", "ipaaca-iu-payload-update", true)
{
}
std::string IUPayloadUpdateConverter::serialize(const AnnotatedData& data, std::string& wire)
{
assert(data.first == getDataType()); // "ipaaca::IUPayloadUpdate"
boost::shared_ptr<const IUPayloadUpdate> obj = boost::static_pointer_cast<const IUPayloadUpdate> (data.second);
boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
// transfer obj data to pbo
pbo->set_uid(obj->uid);
pbo->set_revision(obj->revision);
pbo->set_writer_name(obj->writer_name);
pbo->set_is_delta(obj->is_delta);
for (std::map<std::string, std::string>::const_iterator it=obj->new_items.begin(); it!=obj->new_items.end(); ++it) {
protobuf::PayloadItem* item = pbo->add_new_items();
item->set_key(it->first);
item->set_value(it->second);
item->set_type("str"); // FIXME other types than str (later)
}
for (std::vector<std::string>::const_iterator it=obj->keys_to_remove.begin(); it!=obj->keys_to_remove.end(); ++it) {
pbo->add_keys_to_remove(*it);
}
pbo->SerializeToString(&wire);
return getWireSchema();
}
AnnotatedData IUPayloadUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "ipaaca-iu-payload-update"
boost::shared_ptr<protobuf::IUPayloadUpdate> pbo(new protobuf::IUPayloadUpdate());
pbo->ParseFromString(wire);
boost::shared_ptr<IUPayloadUpdate> obj(new IUPayloadUpdate());
// transfer pbo data to obj
obj->uid = pbo->uid();
obj->revision = pbo->revision();
obj->writer_name = pbo->writer_name();
obj->is_delta = pbo->is_delta();
for (int i=0; i<pbo->new_items_size(); i++) {
const protobuf::PayloadItem& it = pbo->new_items(i);
obj->new_items[it.key()] = it.value();
}
for (int i=0; i<pbo->keys_to_remove_size(); i++) {
obj->keys_to_remove.push_back(pbo->keys_to_remove(i));
}
return std::make_pair(getDataType(), obj);
}
//}}}
// IULinkUpdateConverter//{{{
IULinkUpdateConverter::IULinkUpdateConverter()
: Converter<std::string> ("ipaaca::IULinkUpdate", "ipaaca-iu-link-update", true)
{
}
std::string IULinkUpdateConverter::serialize(const AnnotatedData& data, std::string& wire)
{
assert(data.first == getDataType()); // "ipaaca::IULinkUpdate"
boost::shared_ptr<const IULinkUpdate> obj = boost::static_pointer_cast<const IULinkUpdate> (data.second);
boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
// transfer obj data to pbo
pbo->set_uid(obj->uid);
pbo->set_revision(obj->revision);
pbo->set_writer_name(obj->writer_name);
pbo->set_is_delta(obj->is_delta);
for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->new_links.begin(); it!=obj->new_links.end(); ++it) {
protobuf::LinkSet* links = pbo->add_new_links();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
for (std::map<std::string, std::set<std::string> >::const_iterator it=obj->links_to_remove.begin(); it!=obj->links_to_remove.end(); ++it) {
protobuf::LinkSet* links = pbo->add_links_to_remove();
links->set_type(it->first);
for (std::set<std::string>::const_iterator it2=it->second.begin(); it2!=it->second.end(); ++it2) {
links->add_targets(*it2);
}
}
pbo->SerializeToString(&wire);
return getWireSchema();
}
AnnotatedData IULinkUpdateConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "ipaaca-iu-link-update"
boost::shared_ptr<protobuf::IULinkUpdate> pbo(new protobuf::IULinkUpdate());
pbo->ParseFromString(wire);
boost::shared_ptr<IULinkUpdate> obj(new IULinkUpdate());
// transfer pbo data to obj
obj->uid = pbo->uid();
obj->revision = pbo->revision();
obj->writer_name = pbo->writer_name();
obj->is_delta = pbo->is_delta();
for (int i=0; i<pbo->new_links_size(); ++i) {
const protobuf::LinkSet& it = pbo->new_links(i);
for (int j=0; j<it.targets_size(); ++j) {
obj->new_links[it.type()].insert(it.targets(j)); // = vec;
}
}
for (int i=0; i<pbo->links_to_remove_size(); ++i) {
const protobuf::LinkSet& it = pbo->links_to_remove(i);
for (int j=0; j<it.targets_size(); ++j) {
obj->links_to_remove[it.type()].insert(it.targets(j));
}
}
return std::make_pair(getDataType(), obj);
}
//}}}
// IntConverter//{{{
IntConverter::IntConverter()
: Converter<std::string> ("int", "int32", true)
{
}
std::string IntConverter::serialize(const AnnotatedData& data, std::string& wire)
{
// Ensure that DATA actually holds a datum of the data-type we expect.
assert(data.first == getDataType()); // "int"
// NOTE: a dynamic_pointer_cast cannot be used from void*
boost::shared_ptr<const int> obj = boost::static_pointer_cast<const int> (data.second);
boost::shared_ptr<protobuf::IntMessage> pbo(new protobuf::IntMessage());
// transfer obj data to pbo
pbo->set_value(*obj);
pbo->SerializeToString(&wire);
return getWireSchema();
}
AnnotatedData IntConverter::deserialize(const std::string& wireSchema, const std::string& wire) {
assert(wireSchema == getWireSchema()); // "int"
boost::shared_ptr<protobuf::IntMessage> pbo(new protobuf::IntMessage());
pbo->ParseFromString(wire);
boost::shared_ptr<int> obj = boost::shared_ptr<int>(new int(pbo->value()));
return std::make_pair("int", obj);
}
//}}}
} // of namespace ipaaca
#ifndef __IPAACA_H__
#define __IPAACA_H_
/// ipaaca/IU/RSB protocol major version number
#define IPAACA_PROTOCOL_VERSION_MAJOR 1
/// ipaaca/IU/RSB protocol minor version number
#define IPAACA_PROTOCOL_VERSION_MINOR 0
/// running release number of ipaaca-c++
#define IPAACA_CPP_RELEASE_NUMBER 1
/// date of last release number increment
#define IPAACA_CPP_RELEASE_DATE "2012-04-13"
#ifdef IPAACA_DEBUG_MESSAGES
#define IPAACA_INFO(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- " << i << std::endl;
#define IPAACA_WARNING(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- WARNING: " << i << std::endl;
#define IPAACA_IMPLEMENT_ME std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- IMPLEMENT ME" << std::endl;
#define IPAACA_TODO(i) std::cout << __FILE__ << ":" << __LINE__ << ": " << __func__ << "() -- TODO: " << i << std::endl;
#else
#define IPAACA_INFO(i) ;
#define IPAACA_WARNING(i) ;
#define IPAACA_IMPLEMENT_ME(i) ;
#define IPAACA_TODO(i) ;
#endif
/// marking pure virtual functions for extra readability
#define _IPAACA_ABSTRACT_
/// value to return when reading nonexistant payload keys
#define IPAACA_PAYLOAD_DEFAULT_STRING_VALUE ""
#include <iostream>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/pointer_cast.hpp>
#include <rsc/runtime/TypeStringTools.h>
#include <rsb/Factory.h>
#include <rsb/Handler.h>
#include <rsb/Event.h>
#include <rsb/converter/Repository.h>
#include <rsb/converter/ProtocolBufferConverter.h>
#include <rsb/converter/Converter.h>
#include <rsb/rsbexports.h>
#include <ipaaca.pb.h>
#include <pthread.h>
#include <uuid/uuid.h>
//using namespace boost;
using namespace rsb;
using namespace rsb::filter;
using namespace rsb::converter;
using namespace rsb::patterns;
namespace ipaaca {
typedef uint32_t revision_t;
/// Type of the IU event. Realized as an integer to enable bit masks for filters.
typedef uint32_t IUEventType;
#define IU_ADDED 1
#define IU_COMMITTED 2
#define IU_DELETED 4
#define IU_RETRACTED 8
#define IU_UPDATED 16
#define IU_LINKSUPDATED 32
/// Bit mask for receiving all events
#define IU_ALL_EVENTS 63
/// Convert an int event type to a human-readable string
inline std::string iu_event_type_to_str(IUEventType type)
{
switch(type) {
case IU_ADDED: return "ADDED";
case IU_COMMITTED: return "COMMITTED";
case IU_DELETED: return "DELETED";
case IU_RETRACTED: return "RETRACTED";
case IU_UPDATED: return "UPDATED";
case IU_LINKSUPDATED: return "LINKSUPDATED";
default: return "(NOT A KNOWN SINGLE IU EVENT TYPE)";
}
}
/// IU access mode: PUSH means that updates are broadcast; REMOTE means that reads are RPC calls; MESSAGE means a fire-and-forget message
enum IUAccessMode {
IU_ACCESS_PUSH,
IU_ACCESS_REMOTE,
IU_ACCESS_MESSAGE
};
class PayloadEntryProxy;
class Payload;
class IUInterface;
class IU;
class RemotePushIU;
class IULinkUpdate;
class IULinkUpdateConverter;
class IUPayloadUpdate;
class IUPayloadUpdateConverter;
class IUStore;
class FrozenIUStore;
class Buffer;
class InputBuffer;
class OutputBuffer;
/// generate a UUID as an ASCII string
std::string generate_uuid_string();
/// store for (local) IUs. TODO Stores need to be unified more
class IUStore: public std::map<std::string, boost::shared_ptr<IU> >
{
};
/// store for RemotePushIUs. TODO Stores need to be unified more
class RemotePushIUStore: public std::map<std::string, boost::shared_ptr<RemotePushIU> > // TODO genericize to all remote IU types
{
};
/// a reentrant lock/mutex
class Lock
{
protected:
pthread_mutexattr_t _attrs;
pthread_mutex_t _mutex;
public:
inline Lock() {
pthread_mutexattr_init(&_attrs);
pthread_mutexattr_settype(&_attrs, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&_mutex, &_attrs);
}
inline ~Lock() {
pthread_mutex_destroy(&_mutex);
pthread_mutexattr_destroy(&_attrs);
}
inline void lock() {
pthread_mutex_lock(&_mutex);
}
inline void unlock() {
pthread_mutex_unlock(&_mutex);
}
};
typedef std::set<std::string> LinkSet;
typedef std::map<std::string, LinkSet> LinkMap;
class SmartLinkMap {
friend std::ostream& operator<<(std::ostream& os, const SmartLinkMap& obj);
friend class IUInterface;
friend class IU;
friend class IUConverter;
public:
const LinkSet& get_links(const std::string& key);
const LinkMap& get_all_links();
protected:
LinkMap _links;
static LinkSet empty_link_set;
void _add_and_remove_links(const LinkMap& add, const LinkMap& remove);
void _replace_links(const LinkMap& links);
};
const LinkSet EMPTY_LINK_SET;
//const std::set<std::string> EMPTY_LINK_SET;
//typedef boost::function<void (const std::string&, bool, IUEventType, const std::string&)> IUEventHandlerFunction;
typedef boost::function<void (boost::shared_ptr<IUInterface>, IUEventType, bool)> IUEventHandlerFunction;
class IUEventHandler {
protected:
IUEventHandlerFunction _function;
IUEventType _event_mask;
bool _for_all_categories;
std::set<std::string> _categories;
protected:
inline bool _condition_met(IUEventType event_type, const std::string& category)
{
return ((_event_mask&event_type)!=0) && (_for_all_categories || (_categories.count(category)>0));
}
public:
IUEventHandler(IUEventHandlerFunction function, IUEventType event_mask, const std::string& category);
IUEventHandler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories);
//void call(Buffer* buffer, const std::string& uid, bool local, IUEventType event_type, const std::string& category);
void call(Buffer* buffer, boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category);
typedef boost::shared_ptr<IUEventHandler> ptr;
};
class Buffer { //: public boost::enable_shared_from_this<Buffer> {//{{{
friend class IU;
friend class RemotePushIU;
friend class CallbackIUPayloadUpdate;
friend class CallbackIULinkUpdate;
friend class CallbackIUCommission;
protected:
std::string _uuid;
std::string _basename;
std::string _unique_name;
std::string _id_prefix;
std::vector<IUEventHandler::ptr> _event_handlers;
protected:
_IPAACA_ABSTRACT_ virtual void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef") = 0;
_IPAACA_ABSTRACT_ virtual void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef") = 0;
_IPAACA_ABSTRACT_ virtual void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name="undef") = 0;
void _allocate_unique_name(const std::string& basename, const std::string& function);
inline Buffer(const std::string& basename, const std::string& function) {
_allocate_unique_name(basename, function);
}
void call_iu_event_handlers(boost::shared_ptr<IUInterface> iu, bool local, IUEventType event_type, const std::string& category);
public:
virtual inline ~Buffer() { }
inline const std::string& unique_name() { return _unique_name; }
void register_handler(IUEventHandlerFunction function, IUEventType event_mask, const std::set<std::string>& categories);
void register_handler(IUEventHandlerFunction function, IUEventType event_mask = IU_ALL_EVENTS, const std::string& category="");
//_IPAACA_ABSTRACT_ virtual void add(boost::shared_ptr<IUInterface> iu) = 0;
_IPAACA_ABSTRACT_ virtual boost::shared_ptr<IUInterface> get(const std::string& iu_uid) = 0;
_IPAACA_ABSTRACT_ virtual std::set<boost::shared_ptr<IUInterface> > get_ius() = 0;
};
//}}}
class CallbackIUPayloadUpdate: public Server::Callback<IUPayloadUpdate, int> {
protected:
Buffer* _buffer;
public:
CallbackIUPayloadUpdate(Buffer* buffer);
boost::shared_ptr<int> call(const std::string& methodName, boost::shared_ptr<IUPayloadUpdate> update);
};
class CallbackIULinkUpdate: public Server::Callback<IULinkUpdate, int> {
protected:
Buffer* _buffer;
public:
CallbackIULinkUpdate(Buffer* buffer);
public:
boost::shared_ptr<int> call(const std::string& methodName, boost::shared_ptr<IULinkUpdate> update);
};
class CallbackIUCommission: public Server::Callback<protobuf::IUCommission, int> {
protected:
Buffer* _buffer;
public:
CallbackIUCommission(Buffer* buffer);
public:
boost::shared_ptr<int> call(const std::string& methodName, boost::shared_ptr<protobuf::IUCommission> update);
};
class OutputBuffer: public Buffer { //, public boost::enable_shared_from_this<OutputBuffer> {//{{{
friend class IU;
friend class RemotePushIU;
protected:
protected:
std::map<std::string, Informer<AnyType>::Ptr> _informer_store;
IUStore _iu_store;
Lock _iu_id_counter_lock;
ServerPtr _server;
protected:
// informing functions
void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef");
void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef");
void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name);
// remote access functions
// _remote_update_links(IULinkUpdate)
// _remote_update_payload(IUPayloadUpdate)
// _remote_commit(protobuf::IUCommission)
protected:
void _publish_iu(boost::shared_ptr<IU> iu);
void _retract_iu(boost::shared_ptr<IU> iu);
Informer<AnyType>::Ptr _get_informer(const std::string& category);
protected:
OutputBuffer(const std::string& basename);
void _initialize_server();
public:
static boost::shared_ptr<OutputBuffer> create(const std::string& basename);
~OutputBuffer() {
IPAACA_IMPLEMENT_ME
}
void add(boost::shared_ptr<IU> iu);
boost::shared_ptr<IU> remove(const std::string& iu_uid);
boost::shared_ptr<IU> remove(boost::shared_ptr<IU> iu);
boost::shared_ptr<IUInterface> get(const std::string& iu_uid);
std::set<boost::shared_ptr<IUInterface> > get_ius();
typedef boost::shared_ptr<OutputBuffer> ptr;
};
//}}}
class InputBuffer: public Buffer { //, public boost::enable_shared_from_this<InputBuffer> {//{{{
friend class IU;
friend class RemotePushIU;
protected:
std::map<std::string, ListenerPtr> _listener_store;
std::map<std::string, RemoteServerPtr> _remote_server_store;
RemotePushIUStore _iu_store; // TODO genericize
protected:
inline void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef")
{
IPAACA_WARNING("(ERROR) InputBuffer::_send_iu_link_update() should never be invoked")
}
inline void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef")
{
IPAACA_WARNING("(ERROR) InputBuffer::_send_iu_payload_update() should never be invoked")
}
inline void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name="undef")
{
IPAACA_WARNING("(ERROR) InputBuffer::_send_iu_commission() should never be invoked")
}
protected:
RemoteServerPtr _get_remote_server(const std::string& unique_server_name);
ListenerPtr _create_category_listener_if_needed(const std::string& category);
void _handle_iu_events(EventPtr event);
protected:
InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests);
InputBuffer(const std::string& basename, const std::string& category_interest1);
InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2);
InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3);
InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4);
public:
static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::vector<std::string>& category_interests);
static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1);
static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2);
static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3);
static boost::shared_ptr<InputBuffer> create(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4);
~InputBuffer() {
IPAACA_IMPLEMENT_ME
}
boost::shared_ptr<IUInterface> get(const std::string& iu_uid);
std::set<boost::shared_ptr<IUInterface> > get_ius();
typedef boost::shared_ptr<InputBuffer> ptr;
};
//}}}
class IUConverter: public rsb::converter::Converter<std::string> {//{{{
public:
IUConverter();
std::string serialize(const rsb::AnnotatedData& data, std::string& wire);
rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire);
};//}}}
class IUPayloadUpdate {//{{{
public:
std::string uid;
revision_t revision;
std::string writer_name;
bool is_delta;
std::map<std::string, std::string> new_items;
std::vector<std::string> keys_to_remove;
friend std::ostream& operator<<(std::ostream& os, const IUPayloadUpdate& obj);
typedef boost::shared_ptr<IUPayloadUpdate> ptr;
};//}}}
class IUPayloadUpdateConverter: public rsb::converter::Converter<std::string> {//{{{
public:
IUPayloadUpdateConverter();
std::string serialize(const rsb::AnnotatedData& data, std::string& wire);
rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire);
};//}}}
class IULinkUpdate {//{{{
public:
std::string uid;
revision_t revision;
std::string writer_name;
bool is_delta;
std::map<std::string, std::set<std::string> > new_links;
std::map<std::string, std::set<std::string> > links_to_remove;
friend std::ostream& operator<<(std::ostream& os, const IULinkUpdate& obj);
typedef boost::shared_ptr<IULinkUpdate> ptr;
};//}}}
class IULinkUpdateConverter: public rsb::converter::Converter<std::string> {//{{{
public:
IULinkUpdateConverter();
std::string serialize(const rsb::AnnotatedData& data, std::string& wire);
rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire);
};//}}}
class IntConverter: public rsb::converter::Converter<std::string> {//{{{
public:
IntConverter();
std::string serialize(const rsb::AnnotatedData& data, std::string& wire);
rsb::AnnotatedData deserialize(const std::string& wireSchema, const std::string& wire);
};//}}}
class Initializer
{
public:
static void initialize_ipaaca_rsb_if_needed();
static bool initialized();
protected:
static bool _initialized;
};
class PayloadEntryProxy//{{{
{
protected:
Payload* _payload;
std::string _key;
public:
PayloadEntryProxy(Payload* payload, const std::string& key);
PayloadEntryProxy& operator=(const std::string& value);
operator std::string();
operator long();
operator double();
inline std::string to_str() { return operator std::string(); }
inline long to_int() { return operator long(); }
inline double to_float() { return operator double(); }
};//}}}
class Payload//{{{
{
friend std::ostream& operator<<(std::ostream& os, const Payload& obj);
friend class IUInterface;
friend class IU;
friend class RemotePushIU;
friend class IUConverter;
friend class CallbackIUPayloadUpdate;
protected:
std::string _owner_name;
std::map<std::string, std::string> _store;
boost::shared_ptr<IUInterface> _iu;
protected:
void initialize(boost::shared_ptr<IUInterface> iu);
inline void _set_owner_name(const std::string& name) { _owner_name = name; }
void _remotely_enforced_wipe();
void _remotely_enforced_delitem(const std::string& k);
void _remotely_enforced_setitem(const std::string& k, const std::string& v);
void _internal_replace_all(const std::map<std::string, std::string>& new_contents, const std::string& writer_name="");
void _internal_set(const std::string& k, const std::string& v, const std::string& writer_name="");
void _internal_remove(const std::string& k, const std::string& writer_name="");
public:
inline const std::string& owner_name() { return _owner_name; }
// access
PayloadEntryProxy operator[](const std::string& key);
inline void set(const std::string& k, const std::string& v) { _internal_set(k, v); }
inline void remove(const std::string& k) { _internal_remove(k); }
std::string get(const std::string& k);
typedef boost::shared_ptr<Payload> ptr;
};//}}}
class IUInterface {//{{{
friend class IUConverter;
friend std::ostream& operator<<(std::ostream& os, const IUInterface& obj);
protected:
IUInterface();
public:
inline virtual ~IUInterface() { }
protected:
std::string _uid;
revision_t _revision;
std::string _category;
std::string _payload_type; // default is "MAP"
std::string _owner_name;
bool _committed;
IUAccessMode _access_mode;
bool _read_only;
//boost::shared_ptr<Buffer> _buffer;
Buffer* _buffer;
SmartLinkMap _links;
protected:
friend class Payload;
// Internal functions that perform the update logic,
// e.g. sending a notification across the network
_IPAACA_ABSTRACT_ virtual void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name) = 0;
_IPAACA_ABSTRACT_ virtual void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name) = 0;
//void _set_buffer(boost::shared_ptr<Buffer> buffer);
void _associate_with_buffer(Buffer* buffer);
void _set_buffer(Buffer* buffer);
void _set_uid(const std::string& uid);
void _set_owner_name(const std::string& owner_name);
protected:
// internal functions that do not emit update events
inline void _add_and_remove_links(const LinkMap& add, const LinkMap& remove) { _links._add_and_remove_links(add, remove); }
inline void _replace_links(const LinkMap& links) { _links._replace_links(links); }
public:
inline bool is_published() { return (_buffer != 0); }
inline const std::string& uid() const { return _uid; }
inline revision_t revision() const { return _revision; }
inline const std::string& category() const { return _category; }
inline const std::string& payload_type() const { return _payload_type; }
inline const std::string& owner_name() const { return _owner_name; }
inline bool committed() const { return _committed; }
inline IUAccessMode access_mode() const { return _access_mode; }
inline bool read_only() const { return _read_only; }
//inline boost::shared_ptr<Buffer> buffer() { return _buffer; }
inline Buffer* buffer() const { return _buffer; }
inline const LinkSet& get_links(std::string type) { return _links.get_links(type); }
inline const LinkMap& get_all_links() { return _links.get_all_links(); }
// Payload
_IPAACA_ABSTRACT_ virtual Payload& payload() = 0;
_IPAACA_ABSTRACT_ virtual const Payload& const_payload() const = 0;
// setters
_IPAACA_ABSTRACT_ virtual void commit() = 0;
// functions to modify and update links:
void add_links(const std::string& type, const LinkSet& targets, const std::string& writer_name = "");
void remove_links(const std::string& type, const LinkSet& targets, const std::string& writer_name = "");
void modify_links(const LinkMap& add, const LinkMap& remove, const std::string& writer_name = "");
void set_links(const LinkMap& links, const std::string& writer_name = "");
// (with cpp specific convenience functions:)
void add_link(const std::string& type, const std::string& target, const std::string& writer_name = "");
void remove_link(const std::string& type, const std::string& target, const std::string& writer_name = "");
typedef boost::shared_ptr<IUInterface> ptr;
};//}}}
class IU: public IUInterface {//{{{
friend class Buffer;
friend class InputBuffer;
friend class OutputBuffer;
friend class CallbackIUPayloadUpdate;
friend class CallbackIULinkUpdate;
friend class CallbackIUCommission;
public:
Payload _payload;
protected:
Lock _revision_lock;
protected:
inline void _increase_revision_number() { _revision++; }
IU(const std::string& category, IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" );
public:
inline ~IU() {
IPAACA_IMPLEMENT_ME
}
static boost::shared_ptr<IU> create(const std::string& category, IUAccessMode access_mode=IU_ACCESS_PUSH, bool read_only=false, const std::string& payload_type="MAP" );
inline Payload& payload() { return _payload; }
inline const Payload& const_payload() const { return _payload; }
void commit();
protected:
void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = "");
void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name = "");
protected:
void _internal_commit(const std::string& writer_name = "");
public:
typedef boost::shared_ptr<IU> ptr;
};//}}}
class RemotePushIU: public IUInterface {//{{{
friend class Buffer;
friend class InputBuffer;
friend class OutputBuffer;
friend class IUConverter;
public:
Payload _payload;
protected:
RemotePushIU();
static boost::shared_ptr<RemotePushIU> create();
public:
inline ~RemotePushIU() {
IPAACA_IMPLEMENT_ME
}
inline Payload& payload() { return _payload; }
inline const Payload& const_payload() const { return _payload; }
void commit();
protected:
void _modify_links(bool is_delta, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name = "");
void _modify_payload(bool is_delta, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name = "");
protected:
void _apply_update(IUPayloadUpdate::ptr update);
void _apply_link_update(IULinkUpdate::ptr update);
void _apply_commission();
typedef boost::shared_ptr<RemotePushIU> ptr;
};//}}}
class Exception: public std::exception//{{{
{
protected:
std::string _description;
inline Exception(const std::string& description=""): _description(description) { }
public:
inline ~Exception() throw() { }
const char* what() const throw() {
return _description.c_str();
}
};//}}}
class IUNotFoundError: public Exception//{{{
{
public:
inline ~IUNotFoundError() throw() { }
inline IUNotFoundError() { //boost::shared_ptr<IU> iu) {
_description = "IUNotFoundError";
}
};//}}}
class IUPublishedError: public Exception//{{{
{
public:
inline ~IUPublishedError() throw() { }
inline IUPublishedError() { //boost::shared_ptr<IU> iu) {
_description = "IUPublishedError";
}
};//}}}
class IUCommittedError: public Exception//{{{
{
public:
inline ~IUCommittedError() throw() { }
inline IUCommittedError() { //boost::shared_ptr<IU> iu) {
_description = "IUCommittedError";
}
};//}}}
class IUUpdateFailedError: public Exception//{{{
{
public:
inline ~IUUpdateFailedError() throw() { }
inline IUUpdateFailedError() { //boost::shared_ptr<IU> iu) {
_description = "IUUpdateFailedError";
}
};//}}}
class IUReadOnlyError: public Exception//{{{
{
public:
inline ~IUReadOnlyError() throw() { }
inline IUReadOnlyError() { //boost::shared_ptr<IU> iu) {
_description = "IUReadOnlyError";
}
};//}}}
class IUAlreadyInABufferError: public Exception//{{{
{
public:
inline ~IUAlreadyInABufferError() throw() { }
inline IUAlreadyInABufferError() { //boost::shared_ptr<IU> iu) {
_description = "IUAlreadyInABufferError";
}
};//}}}
class IUAlreadyHasAnUIDError: public Exception//{{{
{
public:
inline ~IUAlreadyHasAnUIDError() throw() { }
inline IUAlreadyHasAnUIDError() { //boost::shared_ptr<IU> iu) {
_description = "IUAlreadyHasAnUIDError";
}
};//}}}
class IUAlreadyHasAnOwnerNameError: public Exception//{{{
{
public:
inline ~IUAlreadyHasAnOwnerNameError() throw() { }
inline IUAlreadyHasAnOwnerNameError() { //boost::shared_ptr<IU> iu) {
_description = "IUAlreadyHasAnOwnerNameError";
}
};//}}}
class NotImplementedError: public Exception//{{{
{
public:
inline ~NotImplementedError() throw() { }
inline NotImplementedError() { //boost::shared_ptr<IU> iu) {
_description = "NotImplementedError";
}
};//}}}
// (snippets) //{{{
/*
class IUEventFunctionHandler: public rsb::EventFunctionHandler {
protected:
Buffer* _buffer;
public:
inline IUEventFunctionHandler(Buffer* buffer, const EventFunction& function, const std::string& method="")
: EventFunctionHandler(function, method), _buffer(buffer) { }
};
*/
//}}}
} // of namespace ipaaca
#endif
[transport.spread]
host = localhost
port = 4803
enabled = 1
#include <ipaaca.h>
#include <typeinfo>
using namespace ipaaca;
const char RECV_CATEGORY[] = "WORD";
const char SEND_CATEGORY[] = "TEXT";
class TextSender {
protected:
OutputBuffer::ptr _ob;
InputBuffer::ptr _ib;
public:
TextSender();
void outbuffer_handle_iu_event(IUInterface::ptr iu, IUEventType event_type, bool local);
void inbuffer_handle_iu_event(IUInterface::ptr iu, IUEventType event_type, bool local);
IUInterface::ptr find_last_iu();
void publish_text_to_print(const std::string& text, const std::string& parent_iu_uid="");
};
TextSender::TextSender() {
_ob = OutputBuffer::create("TextSenderOut");
_ob->register_handler(boost::bind(&TextSender::outbuffer_handle_iu_event, this, _1, _2, _3));
_ib = InputBuffer::create("TextSenderIn", RECV_CATEGORY);
_ib->register_handler(boost::bind(&TextSender::inbuffer_handle_iu_event, this, _1, _2, _3));
}
void TextSender::outbuffer_handle_iu_event(IUInterface::ptr iu, IUEventType event_type, bool local)
{
std::cout << "(own IU event " << iu_event_type_to_str(event_type) << " " << iu->uid() << ")" << std::endl;
if (event_type == IU_UPDATED) {
std::set<std::string> parent_uids = iu->get_links("GRIN");
if (parent_uids.size() > 0) {
std::string parent_uid = *(parent_uids.begin());
std::cout << "updating parent ..." << std::endl;
std::set<std::string> next_uids = iu->get_links("SUCCESSOR");
if (next_uids.size() > 0) {
std::string next_uid = *(next_uids.begin());
IUInterface::ptr next_iu = _ob->get(next_uid);
std::set<std::string> next_letter_grin_links = next_iu->get_links("GRIN");
if (next_letter_grin_links.count(parent_uid) == 0) {
// next letter belongs to new word
IUInterface::ptr parent_iu = _ib->get(parent_uid);
parent_iu->payload()["STATE"] = "REALIZED";
} else {
IUInterface::ptr parent_iu = _ib->get(parent_uid);
parent_iu->payload()["STATE"] = "STARTED";
}
} else {
// there are no more letters, this is the end of the final word
IUInterface::ptr parent_iu = _ib->get(parent_uid);
parent_iu->payload()["STATE"] = "REALIZED";
}
std::cout << " ... done." << std::endl;
}
} else {
}
}
void TextSender::inbuffer_handle_iu_event(IUInterface::ptr iu, IUEventType event_type, bool local)
{
if (event_type == IU_LINKSUPDATED) {
std::cout << "links updated" << std::endl;
} else if (event_type == IU_ADDED) {
std::string word = iu->payload()["WORD"];
std::cout << "Received new word: " << word << std::endl;
publish_text_to_print(word, iu->uid());
} else if (event_type == IU_RETRACTED) {
std::string retracted_uid = iu->uid();
} else {
std::cout << "(IU event " << iu_event_type_to_str(event_type) << " " << iu->uid() << ")" << std::endl;
}
}
IUInterface::ptr TextSender::find_last_iu() {
std::set<IUInterface::ptr> ius = _ob->get_ius();
for (std::set<IUInterface::ptr>::iterator it = ius.begin(); it!=ius.end(); ++it) {
if ((*it)->get_links("SUCCESSOR").size() == 0) return *it;
}
return IUInterface::ptr();
}
void TextSender::publish_text_to_print(const std::string& text, const std::string& parent_iu_uid) {
IUInterface::ptr previous_iu = find_last_iu();
if (previous_iu) {
// insert a blank if we already have words in the buffer
IU::ptr iu = IU::create( SEND_CATEGORY );
iu->payload()["CONTENT"] = " ";
_ob->add(iu);
previous_iu->add_link( "SUCCESSOR", iu->uid() );
iu->add_link( "PREDECESSOR", previous_iu->uid() );
if (parent_iu_uid != "") iu->add_link( "GRIN", parent_iu_uid );
previous_iu = iu;
}
for (int i=0; i<text.size(); ++i) {
IU::ptr iu = IU::create( SEND_CATEGORY );
iu->payload()["CONTENT"] = std::string(1, text.at(i));
_ob->add(iu);
if (previous_iu) {
previous_iu->add_link( "SUCCESSOR", iu->uid() );
iu->add_link( "PREDECESSOR", previous_iu->uid() );
if (parent_iu_uid != "") iu->add_link( "GRIN", parent_iu_uid );
}
if (previous_iu) std::cout << "previous IU: " << *previous_iu << std::endl;
previous_iu = iu;
}
}
int main() {
TextSender sender;
sleep(1);
sender.publish_text_to_print("(INIT)");
std::cout << "Press Ctrl-C to cancel..." << std::endl;
while (true) sleep(1);
}
doxygen_generated
.*.sw?
Instructions for building ipaaca on Windows
Install Visual Studio 2017 (with C++), including CMake
Install GoW (Gnu on Windows) to have some POSIX-like utilities, or double-check the commands below
Open terminal via Visual Studio's "x64 Native Tools Command Prompt", for 64-bit builds
The following instructions are for Debug mode, you can use Release, too
(replace all occurrences below).
BUILD DEPS:
protobuf (latest test with 3.9.0):
cd cmake
mkdir build
cd build
cmake -Dprotobuf_BUILD_TESTS=OFF -DCMAKE_INSTALL_PREFIX="C:/Libs/protobuf/x64" -G "Visual Studio 15 2017 Win64" /p:Configuration=Debug ..
cmake --build . --target INSTALL --config Debug -- /nologo /verbosity:minimal /maxcpucount
mosquitto (latest test with 1.5.4):
mkdir build
cd build
# threading=off below disables a nasty library dependency, our dispatcher is threaded anyway
cmake -DWITH_THREADING=OFF -DCMAKE_INSTALL_PREFIX=C:/Libs/mosquitto/x64 -G "Visual Studio 15 2017 Win64" /p:Configuration=Debug ..
cmake --build . --target INSTALL --config Debug -- /nologo /verbosity:minimal /maxcpucount
cp lib\Debug\*.lib lib\cpp\Debug\*.lib \Libs\mosquitto\x64\
BUILD IPAACA (ipaaca4):
cd ......\repo\ipaaca\ipaacalib\proto
mkdir ..\cpp\build\ipaaca
\Libs\protobuf\x64\bin\protoc.exe ipaaca.proto --cpp_out=..\cpp\build\ipaaca
cd ..\cpp\build\ipaaca
cmake -DCMAKE_INSTALL_PREFIX="C:/Libs/ipaaca/x64" -G "Visual Studio 15 2017 Win64" /p:Configuration=Debug ..
cmake --build . --target INSTALL --config Debug -- /nologo /verbosity:minimal /maxcpucount
SETUP FOR VISUAL STUDIO PROJECTS:
Project -> Properties ->
Debugging: add the runtime paths to the environment variables (Path=%Path%;C:\Libs\ipaaca\x64\bin;C:\Libs\mosquitto\x64\bin)
C++: add the include directories (C:\Libs\ipaaca\x64\include;C:\Libs\protobuf\x64\include)
Linker: add ipaaca.lib to the dependencies and C:\Libs\ipaaca\x64\lib to the additional lib dirs
Also add /wd4146 to compiler options if you get errors in protobuf headers (like "unary negative on unsigned value").