Skip to content
Snippets Groups Projects
InputBuffer.java 17.70 KiB
/*
 * This file is part of IPAACA, the
 *  "Incremental Processing Architecture
 *   for Artificial Conversational Agents".
 *
 * Copyright (c) 2009-2013 Sociable Agents Group
 *                         CITEC, Bielefeld University
 *
 * http://opensource.cit-ec.de/projects/ipaaca/
 * http://purl.org/net/ipaaca
 *
 * This file may be licensed under the terms of of the
 * GNU Lesser General Public License Version 3 (the ``LGPL''),
 * or (at your option) any later version.
 *
 * Software distributed under the License is distributed
 * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
 * express or implied. See the LGPL for the specific language
 * governing rights and limitations.
 *
 * You should have received a copy of the LGPL along with this
 * program. If not, go to http://www.gnu.org/licenses/lgpl.html
 * or write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 *
 * The development of this software was supported by the
 * Excellence Cluster EXC 277 Cognitive Interaction Technology.
 * The Excellence Cluster EXC 277 is a grant of the Deutsche
 * Forschungsgemeinschaft (DFG) in the context of the German
 * Excellence Initiative.
 */

package ipaaca;

import ipaaca.protobuf.Ipaaca.IUCommission;
import ipaaca.protobuf.Ipaaca.IUResendRequest;
import ipaaca.protobuf.Ipaaca.IULinkUpdate;
import ipaaca.protobuf.Ipaaca.IUPayloadUpdate;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import lombok.extern.slf4j.Slf4j;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rsb.Event;
import rsb.Factory;
import rsb.Handler;
import rsb.InitializeException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import rsb.Listener;
import rsb.RSBException;
import rsb.Scope;
import rsb.patterns.RemoteServer;

/**
 * An InputBuffer that holds remote IUs.
 * @author hvanwelbergen
 */
@Slf4j
public class InputBuffer extends Buffer
{
    private Map<String, RemoteServer> remoteServerStore = new HashMap<String, RemoteServer>();
    private Map<String, Listener> listenerStore = new HashMap<String, Listener>();
    private Set<String> categoryInterests = new HashSet<String>();
    private final static Logger logger = LoggerFactory.getLogger(InputBuffer.class.getName());
    private IUStore<RemotePushIU> iuStore = new IUStore<RemotePushIU>();
    private IUStore<RemoteMessageIU> messageStore = new IUStore<RemoteMessageIU>();

    private String channel;
    private boolean resendActive;

    public void close()
    {
        for (Listener listener : listenerStore.values())
        {
            try
            {
                listener.deactivate();
            }
            catch (RSBException e)
            {
                log.warn("RSB Exception on deactive {}", e, listener.toString());
            }
            catch (InterruptedException e)
            {
                Thread.currentThread().interrupt();
            }
        }
        for (RemoteServer remServer : remoteServerStore.values())
        {
            try
            {
                remServer.deactivate();
            }
            catch (RSBException e)
            {
                log.warn("RSB Exception on RemoteServer deactivate {}", e, remServer.toString());
            }
            catch (InterruptedException e)
            {
                Thread.currentThread().interrupt();
            }
        }
    }

    // def __init__(self, owning_component_name, category_interests=None, participant_config=None):
    // '''Create an InputBuffer.
    //
    // Keyword arguments:
    // owning_compontent_name -- name of the entity that owns this InputBuffer
    // category_interests -- list of IU categories this Buffer is interested in
    // participant_config = RSB configuration
    // '''
    // super(InputBuffer, self).__init__(owning_component_name, participant_config)
    // self._unique_name = '/ipaaca/component/'+str(owning_component_name)+'ID'+self._uuid+'/IB'
    // self._listener_store = {} # one per IU category
    // self._remote_server_store = {} # one per remote-IU-owning Component
    // self._category_interests = []
    // if category_interests is not None:
    // for cat in category_interests:
    // self._create_category_listener_if_needed(cat)
    public InputBuffer(String owningComponentName, Set<String> categoryInterests)
    {
        this(owningComponentName, categoryInterests, "default");
    }


    public InputBuffer(String owningComponentName, Set<String> categoryInterests, String ipaaca_channel)
    {
        super(owningComponentName);
        resendActive = false;
        String shortIDName = getUniqueShortName();
        uniqueName = "/ipaaca/component/" + shortIDName + "/IB";

        this.channel = ipaaca_channel;
        
        for (String cat : categoryInterests)
        {
            createCategoryListenerIfNeeded(cat);
        }

    // add own uuid as identifier for hidden channel. (dlw)
    createCategoryListenerIfNeeded(shortIDName);
    }

    /** Pass resendActive to toggle resendRequest-functionality. */
    public InputBuffer(String owningComponentName, Set<String> categoryInterests, boolean resendActive)
    {
        super(owningComponentName);
        this.resendActive = resendActive;
        String shortIDName = getUniqueShortName();
        uniqueName = "/ipaaca/component/" + shortIDName + "/IB";

        for (String cat : categoryInterests)
        {
            createCategoryListenerIfNeeded(cat);
        }
        // add own uuid as identifier for hidden channel. (dlw)
        createCategoryListenerIfNeeded(shortIDName);
    }

    public boolean isResendActive() {
        return this.resendActive;
    }

    public void setResendActive(boolean active) {
        this.resendActive = active;
    }

    // def _get_remote_server(self, iu):
    // '''Return (or create, store and return) a remote server.'''
    // if iu.owner_name in self._remote_server_store:
    // return self._remote_server_store[iu.owner_name]
    // remote_server = rsb.createRemoteServer(rsb.Scope(str(iu.owner_name)))
    // self._remote_server_store[iu.owner_name] = remote_server
    // return remote_server
    protected RemoteServer getRemoteServer(AbstractIU iu)
    {
        if (remoteServerStore.containsKey(iu.getOwnerName()))
        {
            return remoteServerStore.get(iu.getOwnerName());
        }
        logger.debug("Getting remote server for {}", iu.getOwnerName());
        RemoteServer remoteServer = Factory.getInstance().createRemoteServer(new Scope(iu.getOwnerName()));
        try
        {
            remoteServer.activate();
        }
        catch (InitializeException e)
        {
            throw new RuntimeException(e);
        }
        catch (RSBException e)
        {
            throw new RuntimeException(e);
        }
        remoteServerStore.put(iu.getOwnerName(), remoteServer);
        return remoteServer;
    }

    protected RemoteServer getRemoteServer(String ownerName)
    {
        if (remoteServerStore.containsKey(ownerName))
        {
            return remoteServerStore.get(ownerName);
        }
        logger.debug("Getting remote server for {}", ownerName);
        RemoteServer remoteServer = Factory.getInstance().createRemoteServer(new Scope(ownerName));
        try
        {
            remoteServer.activate();
        }
        catch (InitializeException e)
        {
            throw new RuntimeException(e);
        }
        catch (RSBException e)
        {
            throw new RuntimeException(e);
        }
        remoteServerStore.put(ownerName, remoteServer);
        return remoteServer;
    }

    // def _create_category_listener_if_needed(self, iu_category):
    // '''Return (or create, store and return) a category listener.'''
    // if iu_category in self._listener_store: return self._informer_store[iu_category]
    // cat_listener = rsb.createListener(rsb.Scope("/ipaaca/category/"+str(iu_category)), config=self._participant_config)
    // cat_listener.addHandler(self._handle_iu_events)
    // self._listener_store[iu_category] = cat_listener
    // self._category_interests.append(iu_category)
    // logger.info("Added category listener for "+iu_category)
    // return cat_listener
    private Listener createCategoryListenerIfNeeded(String category)
    {
        if (listenerStore.containsKey(category))
        {
            return listenerStore.get(category);
        }
        Listener listener;
        try
        {
            listener = Factory.getInstance().createListener(new Scope("/ipaaca/channel/" + this.channel + "/category/" + category));
        }
        catch (InitializeException e1)
        {
            throw new RuntimeException(e1);
        }
        listenerStore.put(category, listener);
        categoryInterests.add(category);
        try
        {
            listener.addHandler(new InputHandler(), true);
        }
        catch (InterruptedException e1)
        {
            Thread.currentThread().interrupt();
        }
        logger.info("Added category listener for {}", category);
        try
        {
            listener.activate();
        }
        catch (InitializeException e)
        {
            throw new RuntimeException(e);
        }
        catch (RSBException e)
        {
            throw new RuntimeException(e);
        }
        return listener;
    }

    class InputHandler implements Handler
    {

        @Override
        public void internalNotify(Event ev)
        {
            handleIUEvents(ev);
        }

    }

    // def _handle_iu_events(self, event):
    // '''Dispatch incoming IU events.
    //
    // Adds incoming IU's to the store, applies payload and commit updates to
    // IU, calls IU event handlers.'
    //
    // Keyword arguments:
    // event -- a converted RSB event
    // '''
    // if type(event.data) is RemotePushIU:
    // # a new IU
    // if event.data.uid in self._iu_store:
    // # already in our store
    // pass
    // else:
    // self._iu_store[ event.data.uid ] = event.data
    // event.data.buffer = self
    // self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.ADDED, category=event.data.category)
    // else:
    // # an update to an existing IU
    // if event.data.writer_name == self.unique_name:
    // # Discard updates that originate from this buffer
    // return
    // if event.data.uid not in self._iu_store:
    // # TODO: we should request the IU's owner to send us the IU
    // logger.warning("Update message for IU which we did not fully receive before.")
    // return
    // if type(event.data) is iuProtoBuf_pb2.IUCommission:
    // # IU commit
    // iu = self._iu_store[event.data.uid]
    // iu._apply_commission()
    // iu._revision = event.data.revision
    // self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.COMMITTED, category=iu.category)
    // elif type(event.data) is IUPayloadUpdate:
    // # IU payload update
    // iu = self._iu_store[event.data.uid]
    // iu._apply_update(event.data)
    // self.call_iu_event_handlers(event.data.uid, local=False, event_type=IUEventType.UPDATED, category=iu.category)
    /**
     * Dispatch incoming IU events.
     */
    private void handleIUEvents(Event event)
    {
        if (event.getData() instanceof RemoteMessageIU)
        {
            RemoteMessageIU rm = (RemoteMessageIU) event.getData();
            if (messageStore.containsKey(rm.getUid())) {
                logger.warn("Spurious RemoteMessage event: already got this UID: "+rm.getUid());
                return;
            }

            //logger.info("Adding Message "+rm.getUid());
            messageStore.put(rm.getUid(), rm);
            //logger.info("Calling handlers for Message "+rm.getUid());
            callIuEventHandlers(rm.getUid(),false, IUEventType.MESSAGE, rm.getCategory());
            //logger.info("Removing Message "+rm.getUid());
            messageStore.remove(rm.getUid());
        }
        else if (event.getData() instanceof RemotePushIU)
        {
            RemotePushIU rp = (RemotePushIU) event.getData();
            // a new IU
            if (iuStore.containsKey(rp.getUid()))
            {
                // already in our store
                return;
            }
            else
            {
                iuStore.put(rp.getUid(), rp);
                rp.setBuffer(this);
                this.callIuEventHandlers(rp.getUid(), false, IUEventType.ADDED, rp.getCategory());
            }
        }
        else
        {
            if (event.getData() instanceof IULinkUpdate)
            {
                IULinkUpdate iuLinkUpdate = (IULinkUpdate) event.getData();
                if (iuLinkUpdate.getWriterName().equals(this.getUniqueName()))
                {
                    // Discard updates that originate from this buffer
                    return;
                }
                if (!iuStore.containsKey(iuLinkUpdate.getUid()))
                {
                    if (resendActive)
                    {
                        triggerResendRequest(event.getData(), getUniqueShortName());
                    } else {
                        logger.warn("Link update message for IU which we did not fully receive before.");
                    }
                    return;
                }
                RemotePushIU iu = this.iuStore.get(iuLinkUpdate.getUid());
                iu.applyLinkUpdate(iuLinkUpdate);
                callIuEventHandlers(iu.getUid(), false, IUEventType.LINKSUPDATED, iu.category);
            }
            if (event.getData() instanceof IUPayloadUpdate)
            {
                IUPayloadUpdate iuUpdate = (IUPayloadUpdate) event.getData();
                logger.debug("handleIUEvents invoked with an IUPayloadUpdate: {}", iuUpdate);
                if (iuUpdate.getWriterName().equals(this.getUniqueName()))
                {
                    // Discard updates that originate from this buffer
                    return;
                }
                if (!iuStore.containsKey(iuUpdate.getUid()))
                {
                    if (resendActive)
                    {
                        triggerResendRequest(event.getData(), getUniqueShortName());
                    } else {
                        logger.warn("Update message for IU which we did not fully receive before.");
                    }
                    return;
                }
                RemotePushIU iu = this.iuStore.get(iuUpdate.getUid());
                iu.applyUpdate(iuUpdate);
                callIuEventHandlers(iu.getUid(), false, IUEventType.UPDATED, iu.category);
            }
            if (event.getData() instanceof IUCommission)
            {
                IUCommission iuc = (IUCommission) event.getData();
                logger.debug("handleIUEvents invoked with an IUCommission: {}", iuc);
                logger.debug("{}, {}", iuc.getWriterName(), this.getUniqueName());

                if (iuc.getWriterName().equals(this.getUniqueName()))
                {
                    // Discard updates that originate from this buffer
                    return;
                }
                if (!iuStore.containsKey(iuc.getUid()))
                {
                    if (resendActive)
                    {
                        triggerResendRequest(event.getData(), getUniqueShortName());
                    } else {
                        logger.warn("Update message for IU which we did not fully receive before.");
                    }
                    return;
                }
                RemotePushIU iu = this.iuStore.get(iuc.getUid());
                iu.applyCommmision();
                iu.setRevision(iuc.getRevision());
                callIuEventHandlers(iuc.getUid(), false, IUEventType.COMMITTED, iu.getCategory());
            }
        }
    }

    private void triggerResendRequest(Object aiuObj, String hiddenScopeName)
    {
        String uid = null;
        String writerName = null;
        if (aiuObj instanceof IULinkUpdate) {
            IULinkUpdate tmp = (IULinkUpdate)aiuObj;
            uid = tmp.getUid();
            writerName = tmp.getWriterName();
        } else if (aiuObj instanceof IUPayloadUpdate) {
            IUPayloadUpdate tmp = (IUPayloadUpdate)aiuObj;
            uid = tmp.getUid();
            writerName = tmp.getWriterName();
        } else if (aiuObj instanceof IUCommission) {
            IUCommission tmp = (IUCommission)aiuObj;
            uid = tmp.getUid();
            writerName = tmp.getWriterName();
        }
        RemoteServer rServer = null;
        if (writerName != null)
            rServer = getRemoteServer(writerName);
        if ((rServer != null)&&(uid != null)) {
            IUResendRequest iurr = IUResendRequest.newBuilder().setUid(uid).setHiddenScopeName(hiddenScopeName).build();
            int rRevision = 0;
            try
                {
                    rRevision = (Integer) rServer.call("resendRequest", iurr);
                }
                catch (RSBException e)
                {
                    throw new RuntimeException(e);
                }
                catch (ExecutionException e)
                {
                    throw new RuntimeException(e);
                }
                catch (TimeoutException e)
                {
                    throw new RuntimeException(e);
                }
            if (rRevision == 0)
            {
                //throw new IUResendFailedException(aiu); // TODO
            }
        }
    }

    public InputBuffer(String owningComponentName)
    {
        super(owningComponentName);
    }

    @Override
    public AbstractIU getIU(String iuid)
    {
        if (iuStore.get(iuid) != null)
        {
            return iuStore.get(iuid);
        }
        else
        {
            return messageStore.get(iuid);
        }
    }

    public Collection<RemotePushIU> getIUs()
    {
        return iuStore.values();
    }

}