diff --git a/ipaacalib/cpp/include/ipaaca/ipaaca.h b/ipaacalib/cpp/include/ipaaca/ipaaca.h index 6bcdd8d137d372ce249b99aaa2f279e723084d16..37ce8d14d6a15612e88220e341c6cc69aa5e2389 100644 --- a/ipaacalib/cpp/include/ipaaca/ipaaca.h +++ b/ipaacalib/cpp/include/ipaaca/ipaaca.h @@ -1,10 +1,10 @@ /* * This file is part of IPAACA, the * "Incremental Processing Architecture - * for Artificial Conversational Agents". + * for Artificial Conversational Agents". * * Copyright (c) 2009-2013 Sociable Agents Group - * CITEC, Bielefeld University + * CITEC, Bielefeld University * * http://opensource.cit-ec.de/projects/ipaaca/ * http://purl.org/net/ipaaca @@ -21,7 +21,7 @@ * You should have received a copy of the LGPL along with this * program. If not, go to http://www.gnu.org/licenses/lgpl.html * or write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * * The development of this software was supported by the * Excellence Cluster EXC 277 Cognitive Interaction Technology. @@ -192,6 +192,7 @@ class OutputBuffer; class CallbackIUPayloadUpdate; class CallbackIULinkUpdate; class CallbackIUCommission; +class CallbackIUResendRequest; class CallbackIURetraction; class IUConverter; @@ -330,7 +331,7 @@ IPAACA_HEADER_EXPORT class SmartLinkMap { public: IPAACA_HEADER_EXPORT const LinkSet& get_links(const std::string& key); IPAACA_HEADER_EXPORT const LinkMap& get_all_links(); - + protected: IPAACA_MEMBER_VAR_EXPORT LinkMap _links; IPAACA_MEMBER_VAR_EXPORT static LinkSet empty_link_set; @@ -369,6 +370,7 @@ IPAACA_HEADER_EXPORT class Buffer { //: public boost::enable_shared_from_this<Bu friend class CallbackIUPayloadUpdate; friend class CallbackIULinkUpdate; friend class CallbackIUCommission; + friend class CallbackIUResendRequest; protected: //Lock _handler_lock; IPAACA_MEMBER_VAR_EXPORT std::string _uuid; @@ -380,6 +382,7 @@ IPAACA_HEADER_EXPORT class Buffer { //: public boost::enable_shared_from_this<Bu IPAACA_HEADER_EXPORT _IPAACA_ABSTRACT_ virtual void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef") = 0; IPAACA_HEADER_EXPORT _IPAACA_ABSTRACT_ virtual void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef") = 0; IPAACA_HEADER_EXPORT _IPAACA_ABSTRACT_ virtual void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name="undef") = 0; + IPAACA_HEADER_EXPORT _IPAACA_ABSTRACT_ virtual void _send_iu_resendrequest(IUInterface* iu, revision_t revision, const std::string& writer_name="undef") = 0; IPAACA_HEADER_EXPORT void _allocate_unique_name(const std::string& basename, const std::string& function); IPAACA_HEADER_EXPORT inline Buffer(const std::string& basename, const std::string& function) { _allocate_unique_name(basename, function); @@ -416,6 +419,7 @@ IPAACA_HEADER_EXPORT class OutputBuffer: public Buffer { //, public boost::enabl IPAACA_HEADER_EXPORT void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef"); IPAACA_HEADER_EXPORT void _send_iu_payload_update(IUInterface* iu, bool is_delta, revision_t revision, const std::map<std::string, std::string>& new_items, const std::vector<std::string>& keys_to_remove, const std::string& writer_name="undef"); IPAACA_HEADER_EXPORT void _send_iu_commission(IUInterface* iu, revision_t revision, const std::string& writer_name); + IPAACA_HEADER_EXPORT void _send_iu_resendrequest(IUInterface* iu, revision_t revision, const std::string& writer_name); // remote access functions // _remote_update_links(IULinkUpdate) // _remote_update_payload(IUPayloadUpdate) @@ -452,6 +456,7 @@ IPAACA_HEADER_EXPORT class InputBuffer: public Buffer { //, public boost::enable IPAACA_HEADER_EXPORT rsb::patterns::RemoteServerPtr _get_remote_server(const std::string& unique_server_name); IPAACA_HEADER_EXPORT rsb::ListenerPtr _create_category_listener_if_needed(const std::string& category); IPAACA_HEADER_EXPORT void _handle_iu_events(rsb::EventPtr event); + IPAACA_HEADER_EXPORT void _trigger_resend_request(rsb::EventPtr event); #endif protected: IPAACA_HEADER_EXPORT inline void _send_iu_link_update(IUInterface* iu, bool is_delta, revision_t revision, const LinkMap& new_links, const LinkMap& links_to_remove, const std::string& writer_name="undef") @@ -466,6 +471,10 @@ IPAACA_HEADER_EXPORT class InputBuffer: public Buffer { //, public boost::enable { IPAACA_WARNING("(ERROR) InputBuffer::_send_iu_commission() should never be invoked") } + IPAACA_HEADER_EXPORT inline void _send_iu_resendrequest(IUInterface* iu, revision_t revision, const std::string& writer_name="undef") + { + IPAACA_WARNING("(ERROR) InputBuffer::_send_iu_resendrequest() should never be invoked") + } protected: IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::set<std::string>& category_interests); IPAACA_HEADER_EXPORT InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests); @@ -659,6 +668,7 @@ IPAACA_HEADER_EXPORT class IU: public IUInterface {//{{{ friend class CallbackIUPayloadUpdate; friend class CallbackIULinkUpdate; friend class CallbackIUCommission; + friend class CallbackIUResendRequest; public: IPAACA_MEMBER_VAR_EXPORT Payload _payload; protected: @@ -689,6 +699,7 @@ IPAACA_HEADER_EXPORT class Message: public IU {//{{{ friend class CallbackIUPayloadUpdate; friend class CallbackIULinkUpdate; friend class CallbackIUCommission; + friend class CallbackIUResendRequest; protected: IPAACA_HEADER_EXPORT Message(const std::string& category, IUAccessMode access_mode=IU_ACCESS_MESSAGE, bool read_only=true, const std::string& payload_type="MAP" ); public: @@ -794,6 +805,14 @@ IPAACA_HEADER_EXPORT class IUUpdateFailedError: public Exception//{{{ _description = "IUUpdateFailedError"; } };//}}} +IPAACA_HEADER_EXPORT class IUResendRequestFailedError: public Exception//{{{ +{ + public: + IPAACA_HEADER_EXPORT inline ~IUResendRequestFailedError() throw() { } + IPAACA_HEADER_EXPORT inline IUResendRequestFailedError() { //boost::shared_ptr<IU> iu) { + _description = "IUResendRequestFailedError"; + } +};//}}} IPAACA_HEADER_EXPORT class IUReadOnlyError: public Exception//{{{ { public: @@ -867,6 +886,14 @@ IPAACA_HEADER_EXPORT class CallbackIUCommission: public rsb::patterns::Server::C public: IPAACA_HEADER_EXPORT boost::shared_ptr<int> call(const std::string& methodName, boost::shared_ptr<protobuf::IUCommission> update); };//}}} +IPAACA_HEADER_EXPORT class CallbackIUResendRequest: public rsb::patterns::Server::Callback<protobuf::IUResendRequest, int> {//{{{ + protected: + IPAACA_MEMBER_VAR_EXPORT Buffer* _buffer; + public: + IPAACA_HEADER_EXPORT CallbackIUResendRequest(Buffer* buffer); + public: + IPAACA_HEADER_EXPORT boost::shared_ptr<int> call(const std::string& methodName, boost::shared_ptr<protobuf::IUResendRequest> update); +};//}}} IPAACA_HEADER_EXPORT class CallbackIURetraction: public rsb::patterns::Server::Callback<protobuf::IURetraction, int> {//{{{ protected: IPAACA_MEMBER_VAR_EXPORT Buffer* _buffer; diff --git a/ipaacalib/cpp/src/ipaaca.cc b/ipaacalib/cpp/src/ipaaca.cc index 879e76c377e3ad9f45582cd81e300b85b8c74c4a..d96f5e9d6fdc7d7da8b38246028f418cd4cd0685 100644 --- a/ipaacalib/cpp/src/ipaaca.cc +++ b/ipaacalib/cpp/src/ipaaca.cc @@ -1,10 +1,10 @@ /* * This file is part of IPAACA, the * "Incremental Processing Architecture - * for Artificial Conversational Agents". + * for Artificial Conversational Agents". * * Copyright (c) 2009-2013 Sociable Agents Group - * CITEC, Bielefeld University + * CITEC, Bielefeld University * * http://opensource.cit-ec.de/projects/ipaaca/ * http://purl.org/net/ipaaca @@ -21,7 +21,7 @@ * You should have received a copy of the LGPL along with this * program. If not, go to http://www.gnu.org/licenses/lgpl.html * or write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * * The development of this software was supported by the * Excellence Cluster EXC 277 Cognitive Interaction Technology. @@ -50,8 +50,8 @@ using namespace rsb::patterns; #define VERBOSE_HANDLERS 0 Lock& logger_lock() { - static Lock lock; - return lock; + static Lock lock; + return lock; } // util and init//{{{ @@ -64,36 +64,40 @@ IPAACA_EXPORT bool Initializer::initialized() { return _initialized; } IPAACA_EXPORT void Initializer::initialize_ipaaca_rsb_if_needed() { if (_initialized) return; - + //IPAACA_INFO("Calling initialize_updated_default_config()") initialize_updated_default_config(); // RYT FIXME This configuration stuff has been simply removed in rsb! //ParticipantConfig config = ParticipantConfig::fromConfiguration(); //getFactory().setDefaultParticipantConfig(config); - + //IPAACA_INFO("Creating and registering Converters") boost::shared_ptr<IUConverter> iu_converter(new IUConverter()); converterRepository<std::string>()->registerConverter(iu_converter); - + boost::shared_ptr<MessageConverter> message_converter(new MessageConverter()); converterRepository<std::string>()->registerConverter(message_converter); - + boost::shared_ptr<IUPayloadUpdateConverter> payload_update_converter(new IUPayloadUpdateConverter()); converterRepository<std::string>()->registerConverter(payload_update_converter); - + boost::shared_ptr<IULinkUpdateConverter> link_update_converter(new IULinkUpdateConverter()); converterRepository<std::string>()->registerConverter(link_update_converter); - + boost::shared_ptr<ProtocolBufferConverter<protobuf::IUCommission> > iu_commission_converter(new ProtocolBufferConverter<protobuf::IUCommission> ()); converterRepository<std::string>()->registerConverter(iu_commission_converter); - + + // dlw + boost::shared_ptr<ProtocolBufferConverter<protobuf::IUResendRequest> > iu_resendrequest_converter(new ProtocolBufferConverter<protobuf::IUResendRequest> ()); + converterRepository<std::string>()->registerConverter(iu_resendrequest_converter); + boost::shared_ptr<ProtocolBufferConverter<protobuf::IURetraction> > iu_retraction_converter(new ProtocolBufferConverter<protobuf::IURetraction> ()); converterRepository<std::string>()->registerConverter(iu_retraction_converter); - + boost::shared_ptr<IntConverter> int_converter(new IntConverter()); converterRepository<std::string>()->registerConverter(int_converter); - + //IPAACA_INFO("Initialization complete.") _initialized = true; //IPAACA_TODO("initialize all converters") @@ -402,6 +406,8 @@ IPAACA_EXPORT void Buffer::call_iu_event_handlers(boost::shared_ptr<IUInterface> IPAACA_EXPORT CallbackIUPayloadUpdate::CallbackIUPayloadUpdate(Buffer* buffer): _buffer(buffer) { } IPAACA_EXPORT CallbackIULinkUpdate::CallbackIULinkUpdate(Buffer* buffer): _buffer(buffer) { } IPAACA_EXPORT CallbackIUCommission::CallbackIUCommission(Buffer* buffer): _buffer(buffer) { } +// dlw +IPAACA_EXPORT CallbackIUResendRequest::CallbackIUResendRequest(Buffer* buffer): _buffer(buffer) { } IPAACA_EXPORT boost::shared_ptr<int> CallbackIUPayloadUpdate::call(const std::string& methodName, boost::shared_ptr<IUPayloadUpdate> update) { @@ -484,7 +490,29 @@ IPAACA_EXPORT boost::shared_ptr<int> CallbackIUCommission::call(const std::strin iu->_revision_lock.unlock(); return boost::shared_ptr<int>(new int(revision)); } - +/** dlw */ +IPAACA_EXPORT boost::shared_ptr<int> CallbackIUResendRequest::call(const std::string& methodName, boost::shared_ptr<protobuf::IUResendRequest> update) +{ + IUInterface::ptr iui = _buffer->get(update->uid()); + if (! iui) { + IPAACA_WARNING("Remote InBuffer tried to spuriously write non-existent IU " << update->uid()) + return boost::shared_ptr<int>(new int(0)); + } + IU::ptr iu = boost::static_pointer_cast<IU>(iui); + if ((update->has_hidden_scope_name() == true)&&(update->hidden_scope_name().compare("") != 0)){// dlw TODO + //Informer<AnyType>::Ptr informer = _buffer->_get_informer(update->hidden_scope_name()); + //Informer<ipaaca::IU>::DataPtr iu_data(iu); + //informer->publish(iu_data); + //_buffer->call_iu_event_handlers(iu, true, IU_ŔESENDREQUEST, update->getHiddenScopeName()); + //_buffer->_publish_resend_update(iui, update->hidden_scope_name()); + _buffer->call_iu_event_handlers(iu, true, IU_UPDATED, update->hidden_scope_name()); + revision_t revision = iu->revision(); + return boost::shared_ptr<int>(new int(revision)); + } else { + revision_t revision = 0; + return boost::shared_ptr<int>(new int(revision)); + } +} //}}} // OutputBuffer//{{{ @@ -506,6 +534,9 @@ IPAACA_EXPORT void OutputBuffer::_initialize_server() _server->registerMethod("updatePayload", Server::CallbackPtr(new CallbackIUPayloadUpdate(this))); _server->registerMethod("updateLinks", Server::CallbackPtr(new CallbackIULinkUpdate(this))); _server->registerMethod("commit", Server::CallbackPtr(new CallbackIUCommission(this))); + // dlw + _server->registerMethod("resendRequest", Server::CallbackPtr(new CallbackIUResendRequest(this))); + //IPAACA_INFO("... exiting.") } IPAACA_EXPORT OutputBuffer::ptr OutputBuffer::create(const std::string& basename) @@ -564,7 +595,7 @@ IPAACA_EXPORT void OutputBuffer::_send_iu_commission(IUInterface* iu, revision_t data->set_revision(revision); if (writer_name=="") data->set_writer_name(_unique_name); else data->set_writer_name(writer_name); - + Informer<AnyType>::Ptr informer = _get_informer(iu->category()); informer->publish(data); } @@ -640,6 +671,7 @@ IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::s for (std::set<std::string>::const_iterator it=category_interests.begin(); it!=category_interests.end(); ++it) { _create_category_listener_if_needed(*it); } + _create_category_listener_if_needed(_uuid); } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::vector<std::string>& category_interests) :Buffer(basename, "IB") @@ -647,17 +679,20 @@ IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::v for (std::vector<std::string>::const_iterator it=category_interests.begin(); it!=category_interests.end(); ++it) { _create_category_listener_if_needed(*it); } + _create_category_listener_if_needed(_uuid); } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1) :Buffer(basename, "IB") { _create_category_listener_if_needed(category_interest1); + _create_category_listener_if_needed(_uuid); } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2) :Buffer(basename, "IB") { _create_category_listener_if_needed(category_interest1); _create_category_listener_if_needed(category_interest2); + _create_category_listener_if_needed(_uuid); } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3) :Buffer(basename, "IB") @@ -665,6 +700,7 @@ IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::s _create_category_listener_if_needed(category_interest1); _create_category_listener_if_needed(category_interest2); _create_category_listener_if_needed(category_interest3); + _create_category_listener_if_needed(_uuid); } IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::string& category_interest1, const std::string& category_interest2, const std::string& category_interest3, const std::string& category_interest4) :Buffer(basename, "IB") @@ -673,6 +709,7 @@ IPAACA_EXPORT InputBuffer::InputBuffer(const std::string& basename, const std::s _create_category_listener_if_needed(category_interest2); _create_category_listener_if_needed(category_interest3); _create_category_listener_if_needed(category_interest4); + _create_category_listener_if_needed(_uuid); } @@ -752,6 +789,40 @@ IPAACA_EXPORT ListenerPtr InputBuffer::_create_category_listener_if_needed(const //IPAACA_INFO("... exiting.") return listener; } +IPAACA_EXPORT void InputBuffer::_trigger_resend_request(EventPtr event) { + std::string type = event->getType(); + std::string uid = NULL; + std::string writerName = NULL; + if (type == "ipaaca::IUPayloadUpdate") { + boost::shared_ptr<IUPayloadUpdate> update = boost::static_pointer_cast<IUPayloadUpdate>(event->getData()); + uid = update->uid; + writerName = update->writer_name; + } else if (type == "ipaaca::IULinkUpdate") { + boost::shared_ptr<IULinkUpdate> update = boost::static_pointer_cast<IULinkUpdate>(event->getData()); + uid = update->uid; + writerName = update->writer_name; + } else if (type == "ipaaca::protobuf::IUCommission") { + boost::shared_ptr<protobuf::IUCommission> update = boost::static_pointer_cast<protobuf::IUCommission>(event->getData()); + uid = update->uid(); + writerName = update->writer_name(); + } + + if (!writerName.empty()) { + //RemoteServerPtr server = boost::static_pointer_cast<InputBuffer>(_buffer)->_get_remote_server(_owner_name); + RemoteServerPtr server = _get_remote_server(writerName); + if (!uid.empty()) { + boost::shared_ptr<protobuf::IUResendRequest> update = boost::shared_ptr<protobuf::IUResendRequest>(new protobuf::IUResendRequest()); + update->set_uid(uid); + update->set_hidden_scope_name(_uuid); + boost::shared_ptr<int> result = server->call<int>("resendRequest", update, IPAACA_REMOTE_SERVER_TIMEOUT); + if (*result == 0) { + throw IUResendRequestFailedError(); + } else { + //revision = *result; //TODO return dlw TODO + } + } + } +} IPAACA_EXPORT void InputBuffer::_handle_iu_events(EventPtr event) { std::string type = event->getType(); @@ -782,6 +853,7 @@ IPAACA_EXPORT void InputBuffer::_handle_iu_events(EventPtr event) } it = _iu_store.find(update->uid); if (it == _iu_store.end()) { + _trigger_resend_request(event); IPAACA_INFO("Ignoring UPDATED message for an IU that we did not fully receive before") return; } @@ -797,6 +869,7 @@ IPAACA_EXPORT void InputBuffer::_handle_iu_events(EventPtr event) } it = _iu_store.find(update->uid); if (it == _iu_store.end()) { + _trigger_resend_request(event); IPAACA_INFO("Ignoring LINKSUPDATED message for an IU that we did not fully receive before") return; } @@ -812,6 +885,7 @@ IPAACA_EXPORT void InputBuffer::_handle_iu_events(EventPtr event) } it = _iu_store.find(update->uid()); if (it == _iu_store.end()) { + _trigger_resend_request(event); IPAACA_INFO("Ignoring COMMITTED message for an IU that we did not fully receive before") return; } @@ -825,6 +899,7 @@ IPAACA_EXPORT void InputBuffer::_handle_iu_events(EventPtr event) boost::shared_ptr<protobuf::IURetraction> update = boost::static_pointer_cast<protobuf::IURetraction>(event->getData()); it = _iu_store.find(update->uid()); if (it == _iu_store.end()) { + _trigger_resend_request(event); IPAACA_INFO("Ignoring RETRACTED message for an IU that we did not fully receive before") return; } @@ -843,7 +918,6 @@ IPAACA_EXPORT void InputBuffer::_handle_iu_events(EventPtr event) //IPAACA_INFO( "New RemotePushIU state: " << *(it->second) ) } } - //}}} @@ -867,7 +941,7 @@ IPAACA_EXPORT void IUInterface::_set_buffer(Buffer* buffer) { //boost::shared_pt throw IUAlreadyInABufferError(); } _buffer = buffer; - + } IPAACA_EXPORT void IUInterface::_set_owner_name(const std::string& owner_name) { @@ -1035,7 +1109,7 @@ void Message::_internal_commit(const std::string& writer_name) if (is_published()) { IPAACA_INFO("Info: committing to a Message after sending has no global effects") } - + } //}}} @@ -1468,7 +1542,7 @@ IPAACA_EXPORT AnnotatedData IUConverter::deserialize(const std::string& wireSche //return std::make_pair(getDataType(), obj); return std::make_pair("ipaaca::RemoteMessage", obj); break; - } + } default: // other cases not handled yet! ( TODO ) throw NotImplementedError(); diff --git a/ipaacalib/java/src/ipaaca/InputBuffer.java b/ipaacalib/java/src/ipaaca/InputBuffer.java index f0d036a527461ef7edb7d20db6f7c217559d8d45..c58049aa098611c4961313df0a318acf9a993911 100644 --- a/ipaacalib/java/src/ipaaca/InputBuffer.java +++ b/ipaacalib/java/src/ipaaca/InputBuffer.java @@ -72,6 +72,7 @@ public class InputBuffer extends Buffer private final static Logger logger = LoggerFactory.getLogger(InputBuffer.class.getName()); private IUStore<RemotePushIU> iuStore = new IUStore<RemotePushIU>(); private IUStore<RemoteMessageIU> messageStore = new IUStore<RemoteMessageIU>(); + private boolean resendActive; public void close() { @@ -126,7 +127,8 @@ public class InputBuffer extends Buffer public InputBuffer(String owningComponentName, Set<String> categoryInterests) { super(owningComponentName); - String shortIDName = getUniqueShortName(); + resendActive = false; + String shortIDName = getUniqueShortName(); uniqueName = "/ipaaca/component/" + shortIDName + "/IB"; for (String cat : categoryInterests) @@ -137,6 +139,30 @@ public class InputBuffer extends Buffer createCategoryListenerIfNeeded(shortIDName); } + /** Pass resendActive to toggle resendRequest-functionality. */ + public InputBuffer(String owningComponentName, Set<String> categoryInterests, boolean resendActive) + { + super(owningComponentName); + this.resendActive = resendActive; + String shortIDName = getUniqueShortName(); + uniqueName = "/ipaaca/component/" + shortIDName + "/IB"; + + for (String cat : categoryInterests) + { + createCategoryListenerIfNeeded(cat); + } + // add own uuid as identifier for hidden channel. (dlw) + createCategoryListenerIfNeeded(shortIDName); + } + + public boolean isResendActive() { + return this.resendActive; + } + + public void setResendActive(boolean active) { + this.resendActive = active; + } + // def _get_remote_server(self, iu): // '''Return (or create, store and return) a remote server.''' // if iu.owner_name in self._remote_server_store: @@ -339,8 +365,12 @@ public class InputBuffer extends Buffer } if (!iuStore.containsKey(iuLinkUpdate.getUid())) { - triggerResendRequest(event.getData(), getUniqueShortName()); - //logger.warn("Link update message for IU which we did not fully receive before."); + if (resendActive) + { + triggerResendRequest(event.getData(), getUniqueShortName()); + } else { + logger.warn("Link update message for IU which we did not fully receive before."); + } return; } RemotePushIU iu = this.iuStore.get(iuLinkUpdate.getUid()); @@ -358,8 +388,12 @@ public class InputBuffer extends Buffer } if (!iuStore.containsKey(iuUpdate.getUid())) { - triggerResendRequest(event.getData(), getUniqueShortName()); - //logger.warn("Update message for IU which we did not fully receive before."); + if (resendActive) + { + triggerResendRequest(event.getData(), getUniqueShortName()); + } else { + logger.warn("Update message for IU which we did not fully receive before."); + } return; } RemotePushIU iu = this.iuStore.get(iuUpdate.getUid()); @@ -379,8 +413,12 @@ public class InputBuffer extends Buffer } if (!iuStore.containsKey(iuc.getUid())) { - triggerResendRequest(event.getData(), getUniqueShortName()); - //logger.warn("Update message for IU which we did not fully receive before."); + if (resendActive) + { + triggerResendRequest(event.getData(), getUniqueShortName()); + } else { + logger.warn("Update message for IU which we did not fully receive before."); + } return; } RemotePushIU iu = this.iuStore.get(iuc.getUid()); diff --git a/ipaacalib/python/src/ipaaca/__init__.py b/ipaacalib/python/src/ipaaca/__init__.py index 180cf21a9d8d91ace9a8b765015d68839a003aec..41b88f612e93e2afd7b02221952dbe9b05050978 100644 --- a/ipaacalib/python/src/ipaaca/__init__.py +++ b/ipaacalib/python/src/ipaaca/__init__.py @@ -1240,7 +1240,7 @@ class InputBuffer(Buffer): """An InputBuffer that holds remote IUs.""" - def __init__(self, owning_component_name, category_interests=None, participant_config=None): + def __init__(self, owning_component_name, category_interests=None, participant_config=None, resend_active = False ): '''Create an InputBuffer. Keyword arguments: @@ -1249,6 +1249,7 @@ class InputBuffer(Buffer): participant_config = RSB configuration ''' super(InputBuffer, self).__init__(owning_component_name, participant_config) + self._resend_active = resend_active self._unique_name = '/ipaaca/component/'+str(owning_component_name)+'ID'+self._uuid+'/IB' self._listener_store = {} # one per IU category self._remote_server_store = {} # one per remote-IU-owning Component @@ -1310,15 +1311,18 @@ class InputBuffer(Buffer): del self._iu_store[ event.data.uid ] else: if event.data.uid not in self._iu_store: # TODO switch default off - logger.warning("Resend message for IU which we did not fully receive before.") - # send resend request to remote server (dlw). - remote_server = self._get_remote_server(event.data) - resend_request = ipaaca_pb2.IUResendRequest() - resend_request.uid = event.data.uid # target iu - resend_request.hidden_scope_name = str(self._uuid) # hidden channel name - rRevision = remote_server.resendRequest(resend_request) - if rRevision == 0: - raise IUResendFailedError(self) + if self._resend_active == True: + logger.warning("Resend message for IU which we did not fully receive before.") + # send resend request to remote server (dlw). + remote_server = self._get_remote_server(event.data) + resend_request = ipaaca_pb2.IUResendRequest() + resend_request.uid = event.data.uid # target iu + resend_request.hidden_scope_name = str(self._uuid) # hidden channel name + rRevision = remote_server.resendRequest(resend_request) + if rRevision == 0: + raise IUResendFailedError(self) + else: + logger.warning("Update message for IU which we did not fully receive before.") return # an update to an existing IU if type_ is ipaaca_pb2.IURetraction: @@ -1362,6 +1366,12 @@ class InputBuffer(Buffer): for interest in category_interests: self._add_category_listener(interest) + def is_resend_active(): + return self._resend_active + + def set_resend_active(active): + self._resend_active = active + class OutputBuffer(Buffer):