From 2d0f6f50f80cfddcbfeeafc0cd6ae62ce6cf0f75 Mon Sep 17 00:00:00 2001
From: Patrick Jentsch <p.jentsch@uni-bielefeld.de>
Date: Fri, 7 May 2021 15:49:47 +0200
Subject: [PATCH] Fix edge case problems in corpus analysis

---
 web/app/corpora/events.py                     | 195 +++++++++---------
 web/app/corpora/views.py                      |   7 -
 .../corpus_analysis/view/ResultsView.js       |  35 ++--
 3 files changed, 113 insertions(+), 124 deletions(-)

diff --git a/web/app/corpora/events.py b/web/app/corpora/events.py
index c09b7cf4..b7ee925d 100644
--- a/web/app/corpora/events.py
+++ b/web/app/corpora/events.py
@@ -25,39 +25,93 @@ corpus_analysis_sessions = {}
 corpus_analysis_clients = {}
 
 
-@socketio.on('export_corpus')
+@socketio.on('corpus_analysis_init')
 @socketio_login_required
-def export_corpus(corpus_id):
-    # TODO: This should not be get_or_404 here - Socket.IO != HTTP request
+def init_corpus_analysis(corpus_id):
     corpus = Corpus.query.get(corpus_id)
     if corpus is None:
-        response = {'code': 404, 'msg': 'Not found'}
-        socketio.emit('export_corpus', response, room=request.sid)
+        response = {'code': 404, 'desc': None, 'msg': 'Not Found'}
+        socketio.emit('corpus_analysis_init', response, room=request.sid)
         return
-    if corpus.status not in ['prepared', 'start analysis', 'stop analysis']:
-        response = {'code': 412, 'msg': 'Precondition Failed'}
-        socketio.emit('export_corpus', response, room=request.sid)
+    if not (corpus.creator == current_user or current_user.is_administrator()):  # noqa
+        response = {'code': 403, 'desc': None, 'msg': 'Forbidden'}
+        socketio.emit('corpus_analysis_init', response, room=request.sid)
         return
-    # delete old corpus archive if it exists/has been build before
-    if corpus.archive_file is not None and os.path.isfile(corpus.archive_file):
-        os.remove(corpus.archive_file)
-    zip_name = corpus.title
-    zip_path = os.path.join(current_user.path, 'corpora', zip_name)
-    corpus.archive_file = os.path.join(corpus.path, zip_name) + '.zip'
-    db.session.commit()
-    shutil.make_archive(zip_path, 'zip', corpus.path)
-    shutil.move(zip_path + '.zip', corpus.archive_file)
-    socketio.emit('export_corpus_' + str(corpus.id), room=request.sid)
-
-
-@socketio.on('corpus_analysis_init')
-@socketio_login_required
-def init_corpus_analysis(corpus_id):
+    if corpus.status not in ['prepared', 'start analysis', 'analysing']:
+        response = {'code': 424, 'desc': 'Corpus status is not "prepared", "start analysis" or "analying"', 'msg': 'Failed Dependency'}  # noqa
+        socketio.emit('corpus_analysis_init', response, room=request.sid)
+        return
+    if corpus.status == 'prepared':
+        corpus.status = 'start analysis'
+        db.session.commit()
+        event = 'user_{}_patch'.format(current_user.id)
+        jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}]  # noqa
+        room = 'user_{}'.format(corpus.user_id)
+        socketio.emit(event, jsonpatch, room=room)
     socketio.start_background_task(corpus_analysis_session_handler,
                                    current_app._get_current_object(),
                                    corpus_id, current_user.id, request.sid)
 
 
+def corpus_analysis_session_handler(app, corpus_id, user_id, session_id):
+    with app.app_context():
+        ''' Setup analysis session '''
+        corpus = Corpus.query.get(corpus_id)
+        retry_counter = 15
+        while corpus.status != 'analysing':
+            db.session.refresh(corpus)
+            retry_counter -= 1
+            if retry_counter == 0:
+                response = {'code': 408, 'desc': 'Corpus analysis session took to long to start', 'msg': 'Request Timeout'}  # noqa
+                socketio.emit('corpus_analysis_init', response, room=request.sid)  # noqa
+            socketio.sleep(3)
+        client = cqi.CQiClient('cqpserver_{}'.format(corpus_id))
+        try:
+            connect_status = client.connect()
+            payload = {'code': connect_status, 'msg': cqi.api.specification.lookup[connect_status]}  # noqa
+        except cqi.errors.CQiException as e:
+            payload = {'code': e.code, 'desc': e.description, 'msg': e.name}
+            response = {'code': 500, 'desc': None,
+                        'msg': 'Internal Server Error', 'payload': payload}
+            socketio.emit('corpus_analysis_init', response, room=session_id)
+            return
+        except gaierror:
+            response = {'code': 500, 'desc': None,
+                        'msg': 'Internal Server Error'}
+            socketio.emit('corpus_analysis_init', response, room=session_id)
+            return
+        corpus_analysis_clients[session_id] = client
+        if corpus_id in corpus_analysis_sessions:
+            corpus_analysis_sessions[corpus_id].append(session_id)
+        else:
+            corpus_analysis_sessions[corpus_id] = [session_id]
+        client.status = 'ready'
+        response = {'code': 200, 'desc': None, 'msg': 'OK', 'payload': payload}
+        socketio.emit('corpus_analysis_init', response, room=session_id)
+        ''' Observe analysis session '''
+        while session_id in socketio_sessions:
+            socketio.sleep(3)
+        ''' Teardown analysis session '''
+        if client.status == 'running':
+            client.status = 'abort'
+            while client.status != 'ready':
+                socketio.sleep(0.1)
+        try:
+            client.disconnect()
+        except cqi.errors.CQiException:
+            pass
+        corpus_analysis_clients.pop(session_id, None)
+        corpus_analysis_sessions[corpus_id].remove(session_id)
+        if not corpus_analysis_sessions[corpus_id]:
+            corpus_analysis_sessions.pop(corpus_id, None)
+            corpus.status = 'stop analysis'
+            db.session.commit()
+            event = 'user_{}_patch'.format(corpus.user_id)
+            jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}]  # noqa
+            room = 'user_{}'.format(corpus.user_id)
+            socketio.emit(event, jsonpatch, room=room)
+
+
 @socketio.on('corpus_analysis_meta_data')
 @socketio_login_required
 def corpus_analysis_get_meta_data(corpus_id):
@@ -137,6 +191,7 @@ def corpus_analysis_query(query):
         client.status = 'abort'
         while client.status != 'ready':
             socketio.sleep(0.1)
+    client.status = 'running'
     try:
         corpus = client.corpora.get('CORPUS')
         query_status = corpus.query(query)
@@ -156,13 +211,11 @@ def corpus_analysis_query(query):
     chunk_start = 0
     context = 50
     progress = 0
-    client.status = 'running'
     while chunk_start <= results.attrs['size']:
         if client.status == 'abort':
             break
         chunk = results.export(context=context, cutoff=chunk_size,
-                               expand_lists=False, offset=chunk_start)
-        chunk['cpos_ranges'] = True
+                               offset=chunk_start)
         if (results.attrs['size'] == 0):
             progress = 100
         else:
@@ -205,7 +258,6 @@ def corpus_analysis_get_match_with_full_context(payload):
         payload['matches'] = []
         payload['cpos_lookup'] = {}
         payload['text_lookup'] = {}
-        payload['cpos_ranges'] = True
         payload['progress'] = 0
         i = 0
         # Send data one match at a time.
@@ -241,70 +293,25 @@ def corpus_analysis_get_match_with_full_context(payload):
     client.status = 'ready'
 
 
-def corpus_analysis_session_handler(app, corpus_id, user_id, session_id):
-    with app.app_context():
-        ''' Setup analysis session '''
-        corpus = Corpus.query.get(corpus_id)
-        user = User.query.get(user_id)
-        if corpus is None:
-            response = {'code': 404, 'desc': None, 'msg': 'Not Found'}
-            socketio.emit('corpus_analysis_init', response, room=session_id)
-            return
-        elif not (corpus.creator == user or user.is_administrator()):
-            response = {'code': 403, 'desc': None, 'msg': 'Forbidden'}
-            socketio.emit('corpus_analysis_init', response, room=session_id)
-            return
-        elif corpus.status == 'unprepared':
-            response = {'code': 424, 'desc': 'Corpus is not prepared',
-                        'msg': 'Failed Dependency'}
-            socketio.emit('corpus_analysis_init', response, room=request.sid)
-            return
-        while corpus.status != 'analysing':
-            db.session.refresh(corpus)
-            socketio.sleep(3)
-        client = cqi.CQiClient('cqpserver_{}'.format(corpus_id))
-        try:
-            connect_status = client.connect()
-            payload = {'code': connect_status,
-                       'msg': cqi.api.specification.lookup[connect_status]}
-        except cqi.errors.CQiException as e:
-            payload = {'code': e.code, 'desc': e.description, 'msg': e.name}
-            response = {'code': 500, 'desc': None,
-                        'msg': 'Internal Server Error', 'payload': payload}
-            socketio.emit('corpus_analysis_init', response, room=session_id)
-            return
-        except gaierror:
-            response = {'code': 500, 'desc': None,
-                        'msg': 'Internal Server Error'}
-            socketio.emit('corpus_analysis_init', response, room=session_id)
-            return
-        corpus_analysis_clients[session_id] = client
-        if corpus_id not in corpus_analysis_sessions:
-            corpus_analysis_sessions[corpus_id] = [session_id]
-        else:
-            corpus_analysis_sessions[corpus_id].append(session_id)
-        client.status = 'ready'
-        response = {'code': 200, 'desc': None, 'msg': 'OK', 'payload': payload}
-        socketio.emit('corpus_analysis_init', response, room=session_id)
-        ''' Observe analysis session '''
-        while session_id in socketio_sessions:
-            socketio.sleep(3)
-        ''' Teardown analysis session '''
-        if client.status == 'running':
-            client.status = 'abort'
-            while client.status != 'ready':
-                socketio.sleep(0.1)
-        try:
-            client.disconnect()
-        except cqi.errors.CQiException:
-            pass
-        corpus_analysis_clients.pop(session_id, None)
-        corpus_analysis_sessions[corpus_id].remove(session_id)
-        if not corpus_analysis_sessions[corpus_id]:
-            corpus_analysis_sessions.pop(corpus_id, None)
-            corpus.status = 'stop analysis'
-            db.session.commit()
-            event = 'user_{}_patch'.format(corpus.user_id)
-            jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}]  # noqa
-            room = 'user_{}'.format(corpus.user_id)
-            socketio.emit(event, jsonpatch, room=room)
+@socketio.on('export_corpus')
+@socketio_login_required
+def export_corpus(corpus_id):
+    corpus = Corpus.query.get(corpus_id)
+    if corpus is None:
+        response = {'code': 404, 'msg': 'Not found'}
+        socketio.emit('export_corpus', response, room=request.sid)
+        return
+    if corpus.status not in ['prepared', 'start analysis', 'stop analysis']:
+        response = {'code': 412, 'msg': 'Precondition Failed'}
+        socketio.emit('export_corpus', response, room=request.sid)
+        return
+    # delete old corpus archive if it exists/has been build before
+    if corpus.archive_file is not None and os.path.isfile(corpus.archive_file):
+        os.remove(corpus.archive_file)
+    zip_name = corpus.title
+    zip_path = os.path.join(current_user.path, 'corpora', zip_name)
+    corpus.archive_file = os.path.join(corpus.path, zip_name) + '.zip'
+    db.session.commit()
+    shutil.make_archive(zip_path, 'zip', corpus.path)
+    shutil.move(zip_path + '.zip', corpus.archive_file)
+    socketio.emit('export_corpus_' + str(corpus.id), room=request.sid)
diff --git a/web/app/corpora/views.py b/web/app/corpora/views.py
index 3b2899d0..46bb4aea 100644
--- a/web/app/corpora/views.py
+++ b/web/app/corpora/views.py
@@ -150,13 +150,6 @@ def download_corpus(corpus_id):
 @login_required
 def analyse_corpus(corpus_id):
     corpus = Corpus.query.get_or_404(corpus_id)
-    if corpus.status == 'prepared':
-        corpus.status = 'start analysis'
-        db.session.commit()
-        event = 'user_{}_patch'.format(corpus.user_id)
-        jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}]  # noqa
-        room = 'user_{}'.format(corpus.user_id)
-        socketio.emit(event, jsonpatch, room=room)
     display_options_form = DisplayOptionsForm(
         prefix='display-options-form',
         result_context=request.args.get('context', 20),
diff --git a/web/app/static/js/modules/corpus_analysis/view/ResultsView.js b/web/app/static/js/modules/corpus_analysis/view/ResultsView.js
index 9394208e..45c5ee17 100644
--- a/web/app/static/js/modules/corpus_analysis/view/ResultsView.js
+++ b/web/app/static/js/modules/corpus_analysis/view/ResultsView.js
@@ -161,24 +161,15 @@ class ResultsList extends List {
   /**
    * Creates cpos either from ranges or not.
    */
-  helperCreateCpos(cpos_ranges, cpos_values) {
-    let lc;
-    let c;
-    let rc;
-    if (cpos_ranges) {
-      /**
-       * Python range like function from MDN
-       * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/from#Sequence_generator_(range)
-       */
-      const range = (start, stop, step) => Array.from({ length: (stop - start) / step + 1}, (_, i) => start + (i * step));
-      lc = range(cpos_values.lc[0], cpos_values.lc[1], 1)
-      c = range(cpos_values.c[0], cpos_values.c[1], 1)
-      rc = range(cpos_values.rc[0], cpos_values.rc[1], 1)
-    } else {
-      lc = cpos_values.lc;
-      c = cpos_values.c;
-      rc = cpos_values.rc;
-    }
+  helperCreateCpos(cpos_values) {
+    let lc, c, rc;
+    /**
+     * Python range like function from MDN: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/from#Sequence_generator_(range)
+     */
+    const range = (start, stop, step) => Array.from({ length: (stop - start) / step + 1}, (_, i) => start + (i * step));
+    lc = cpos_values.lc ? range(cpos_values.lc[0], cpos_values.lc[1], 1) : [];
+    c = range(cpos_values.c[0], cpos_values.c[1], 1);
+    rc = cpos_values.rc ? range(cpos_values.rc[0], cpos_values.rc[1], 1) : [];
     return {lc: lc, c: c, rc: rc};
   }
 
@@ -398,8 +389,7 @@ class ResultsList extends List {
     ])
     let uniqueS = new Set();
     let uniqueContextS = new Set();
-    let {lc, c, rc} = this.helperCreateCpos(results.inspectResultsData.cpos_ranges,
-                                            results.inspectResultsData.matches[0]);
+    let {lc, c, rc} = this.helperCreateCpos(results.inspectResultsData.matches[0]);
     // Create sentence strings as tokens.
     let tokenHTMLArray = [];
     let htmlTokenStr = ``;
@@ -680,8 +670,7 @@ class ResultsList extends List {
   createResultRowElement(item, chunk, client, imported=false) {
     // Gather values from item.
     let values = item.values();
-    let {lc, c, rc} = this.helperCreateCpos(chunk.cpos_ranges,
-                                            values)
+    let {lc, c, rc} = this.helperCreateCpos(values)
     // Get infos for full match row.
     let matchRowElement = document.createElement("tr");
     matchRowElement.setAttribute("data-index", values.index)
@@ -845,4 +834,4 @@ class ResultsList extends List {
 export {
   ViewEventListener,
   ResultsList
-};
\ No newline at end of file
+};
-- 
GitLab