From 98a43ec86f9d31cf8cadd65064b04ece1056df7c Mon Sep 17 00:00:00 2001
From: Patrick Jentsch <p.jentsch@uni-bielefeld.de>
Date: Wed, 18 Aug 2021 15:11:11 +0200
Subject: [PATCH] Use sqlalchemy events to emit jsonpatches to the client.

---
 app/__init__.py                       |   3 +-
 app/corpora/events.py                 |   2 +-
 app/jobs/tasks.py                     |  11 +-
 app/services/views.py                 |   6 +-
 app/{events.py => socketio_events.py} |   6 +-
 app/sqlalchemy_events.py              | 161 ++++++++++++++++++++++++++
 app/tasks/job_utils.py                |  42 -------
 7 files changed, 169 insertions(+), 62 deletions(-)
 rename app/{events.py => socketio_events.py} (100%)
 create mode 100644 app/sqlalchemy_events.py

diff --git a/app/__init__.py b/app/__init__.py
index 75be4213..8a6b4325 100644
--- a/app/__init__.py
+++ b/app/__init__.py
@@ -32,7 +32,8 @@ def create_app(config_name):
         app, message_queue=app.config['NOPAQUE_SOCKETIO_MESSAGE_QUEUE_URI'])
 
     with app.app_context():
-        from . import events
+        from . import socketio_events
+        from . import sqlalchemy_events
         from .admin import admin as admin_blueprint
         from .auth import auth as auth_blueprint
         from .corpora import corpora as corpora_blueprint
diff --git a/app/corpora/events.py b/app/corpora/events.py
index 323bab74..8b4dd2ef 100644
--- a/app/corpora/events.py
+++ b/app/corpora/events.py
@@ -4,7 +4,7 @@ from flask_login import current_user
 from socket import gaierror
 from .. import db, socketio
 from ..decorators import socketio_login_required
-from ..events import socketio_sessions
+from ..socketio_events import socketio_sessions
 from ..models import Corpus
 import cqi
 import math
diff --git a/app/jobs/tasks.py b/app/jobs/tasks.py
index fb4223b8..08c2aa22 100644
--- a/app/jobs/tasks.py
+++ b/app/jobs/tasks.py
@@ -1,4 +1,4 @@
-from .. import db, socketio
+from .. import db
 from ..decorators import background
 from ..models import Job
 
@@ -9,12 +9,8 @@ def delete_job(job_id, *args, **kwargs):
         job = Job.query.get(job_id)
         if job is None:
             raise Exception('Job {} not found'.format(job_id))
-        event = 'user_{}_patch'.format(job.user_id)
-        jsonpatch = [{'op': 'remove', 'path': '/jobs/{}'.format(job.id)}]
-        room = 'user_{}'.format(job.user_id)
         job.delete()
         db.session.commit()
-        socketio.emit(event, jsonpatch, room=room)
 
 
 @background
@@ -29,8 +25,3 @@ def restart_job(job_id, *args, **kwargs):
             pass
         else:
             db.session.commit()
-            event = 'user_{}_patch'.format(job.user_id)
-            jsonpatch = [{'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp()},  # noqa
-                         {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}]  # noqa
-            room = 'user_{}'.format(job.user_id)
-            socketio.emit(event, jsonpatch, room=room)
diff --git a/app/services/views.py b/app/services/views.py
index 62af71f9..75712b34 100644
--- a/app/services/views.py
+++ b/app/services/views.py
@@ -4,7 +4,7 @@ from flask_login import current_user, login_required
 from werkzeug.utils import secure_filename
 from . import services
 from . import SERVICES
-from .. import db, socketio
+from .. import db
 from .forms import AddJobForms
 from ..models import Job, JobInput
 import json
@@ -69,10 +69,6 @@ def service(service):
             job.status = 'submitted'
             db.session.commit()
             flash('Job "{}" added'.format(job.title), 'job')
-            event = 'user_{}_patch'.format(job.user_id)
-            jsonpatch = [{'op': 'add', 'path': '/jobs/{}'.format(job.id), 'value': job.to_dict()}]  # noqa
-            room = 'user_{}'.format(job.user_id)
-            socketio.emit(event, jsonpatch, room=room)
             return make_response(
                 {'redirect_url': url_for('jobs.job', job_id=job.id)}, 201)
     return render_template('services/{}.html.j2'.format(service.replace('-', '_')),
diff --git a/app/events.py b/app/socketio_events.py
similarity index 100%
rename from app/events.py
rename to app/socketio_events.py
index 72cdef66..f03e194c 100644
--- a/app/events.py
+++ b/app/socketio_events.py
@@ -6,9 +6,6 @@ from .decorators import socketio_login_required
 from .models import User
 
 
-###############################################################################
-# Socket.IO event handlers                                                    #
-###############################################################################
 '''
 ' A list containing session ids of connected Socket.IO sessions, to keep track
 ' of all connected sessions, which can be used to determine the runtimes of
@@ -17,6 +14,9 @@ from .models import User
 socketio_sessions = []
 
 
+###############################################################################
+# Socket.IO event handlers                                                    #
+###############################################################################
 @socketio.on('connect')
 @socketio_login_required
 def socketio_connect():
diff --git a/app/sqlalchemy_events.py b/app/sqlalchemy_events.py
new file mode 100644
index 00000000..ea5e6003
--- /dev/null
+++ b/app/sqlalchemy_events.py
@@ -0,0 +1,161 @@
+from . import db, socketio
+from .models import Job, JobInput, JobResult
+import logging
+
+
+###############################################################################
+# SQLAlchemy event handlers                                                   #
+###############################################################################
+
+###############################################################################
+## Job events                                                                 #
+###############################################################################
+@db.event.listens_for(Job, 'after_update')
+def after_job_update(mapper, connection, job):
+    jsonpatch = []
+    for attr in db.inspect(job).attrs:
+        # We don't want to emit changes about relationship fields
+        if attr.key in ['inputs', 'results']:
+            continue
+        history = attr.load_history()
+        if not history.has_changes():
+            continue
+        new_value = history.added[0]
+        # DateTime attributes must be converted to a timestamp
+        if attr.key in ['creation_date', 'end_date']:
+            new_value = None if new_value is None else new_value.timestamp()
+        jsonpatch.append(
+            {
+                'op': 'replace',
+                'path': '/jobs/{}/{}'.format(job.id, attr.key),
+                'value': new_value
+            }
+        )
+    if jsonpatch:
+        event = 'user_{}_patch'.format(job.user_id)
+        room = 'user_{}'.format(job.user_id)
+        socketio.emit(event, jsonpatch, room=room)
+
+@db.event.listens_for(Job, 'after_insert')
+def after_job_insert(mapper, connection, job):
+    event = 'user_{}_patch'.format(job.user_id)
+    jsonpatch = [
+        {
+            'op': 'add',
+            'path': '/jobs/{}'.format(job.id),
+            'value': job.to_dict(include_relationships=False)
+        }
+    ]
+    room = 'user_{}'.format(job.user_id)
+    socketio.emit(event, jsonpatch, room=room)
+
+@db.event.listens_for(Job, 'after_delete')
+def after_job_delete(mapper, connection, job):
+    event = 'user_{}_patch'.format(job.user_id)
+    jsonpatch = [{'op': 'remove', 'path': '/jobs/{}'.format(job.id)}]
+    room = 'user_{}'.format(job.user_id)
+    socketio.emit(event, jsonpatch, room=room)
+
+###############################################################################
+## JobInput events                                                            #
+###############################################################################
+@db.event.listens_for(JobInput, 'after_update')
+def after_job_input_update(mapper, connection, job_input):
+    jsonpatch = []
+    for attr in db.inspect(job_input).attrs:
+        history = attr.load_history()
+        if not history.has_changes():
+            continue
+        new_value = history.added[0]
+        jsonpatch.append(
+            {
+                'op': 'replace',
+                'path': '/jobs/{}/inputs/{}/{}'.format(job_input.job_id,
+                                                       job_input.id,
+                                                       attr.key),
+                'value': new_value
+            }
+        )
+    if jsonpatch:
+        event = 'user_{}_patch'.format(job_input.job.user_id)
+        room = 'user_{}'.format(job_input.job.user_id)
+        socketio.emit(event, jsonpatch, room=room)
+
+@db.event.listens_for(JobInput, 'after_insert')
+def after_job_input_insert(mapper, connection, job_input):
+    event = 'user_{}_patch'.format(job_input.job.user_id)
+    jsonpatch = [
+        {
+            'op': 'add',
+            'path': '/jobs/{}/inputs/{}'.format(job_input.job_id,
+                                                job_input.id),
+            'value': job_input.to_dict(include_relationships=False)
+        }
+    ]
+    room = 'user_{}'.format(job_input.job.user_id)
+    socketio.emit(event, jsonpatch, room=room)
+
+@db.event.listens_for(JobInput, 'after_delete')
+def after_job_input_delete(mapper, connection, job_input):
+    event = 'user_{}_patch'.format(job_input.job.user_id)
+    jsonpatch = [
+        {
+            'op': 'remove',
+            'path': '/jobs/{}/inputs/{}'.format(job_input.job_id,
+                                                job_input.id)
+        }
+    ]
+    room = 'user_{}'.format(job_input.job.user_id)
+    socketio.emit(event, jsonpatch, room=room)
+
+###############################################################################
+## JobResult events                                                           #
+###############################################################################
+@db.event.listens_for(JobResult, 'after_update')
+def after_job_result_update(mapper, connection, job_result):
+    jsonpatch = []
+    for attr in db.inspect(job_result).attrs:
+        history = attr.load_history()
+        if not history.has_changes():
+            continue
+        new_value = history.added[0]
+        jsonpatch.append(
+            {
+                'op': 'replace',
+                'path': '/jobs/{}/results/{}/{}'.format(job_result.job_id,
+                                                        job_result.id,
+                                                        attr.key),
+                'value': new_value
+            }
+        )
+    if jsonpatch:
+        event = 'user_{}_patch'.format(job_result.job.user_id)
+        room = 'user_{}'.format(job_result.job.user_id)
+        socketio.emit(event, jsonpatch, room=room)
+
+@db.event.listens_for(JobResult, 'after_insert')
+def after_job_result_insert(mapper, connection, job_result):
+    event = 'user_{}_patch'.format(job_result.job.user_id)
+    jsonpatch = [
+        {
+            'op': 'add',
+            'path': '/jobs/{}/results/{}'.format(job_result.job_id,
+                                                 job_result.id),
+            'value': job_result.to_dict(include_relationships=False)
+        }
+    ]
+    room = 'user_{}'.format(job_result.job.user_id)
+    socketio.emit(event, jsonpatch, room=room)
+
+@db.event.listens_for(JobResult, 'after_delete')
+def after_job_result_delete(mapper, connection, job_result):
+    event = 'user_{}_patch'.format(job_result.job.user_id)
+    jsonpatch = [
+        {
+            'op': 'remove',
+            'path': '/jobs/{}/results/{}'.format(job_result.job_id,
+                                                 job_result.id)
+        }
+    ]
+    room = 'user_{}'.format(job_result.job.user_id)
+    socketio.emit(event, jsonpatch, room=room)
diff --git a/app/tasks/job_utils.py b/app/tasks/job_utils.py
index ee804bbc..f1a2b6a8 100644
--- a/app/tasks/job_utils.py
+++ b/app/tasks/job_utils.py
@@ -105,12 +105,6 @@ class CheckJobsMixin:
             return
         else:
             job.status = 'queued'
-            patch_operation = {
-                'op': 'replace',
-                'path': '/jobs/{}/status'.format(job.id),
-                'value': job.status
-            }
-            self.buffer_user_patch_operation(job, patch_operation)
         finally:
             self.send_job_notification(job)
 
@@ -125,12 +119,6 @@ class CheckJobsMixin:
                 + '(job.status: {} -> failed)'.format(job.status)
             )
             job.status = 'failed'
-            patch_operation = {
-                'op': 'replace',
-                'path': '/jobs/{}/status'.format(job.id),
-                'value': job.status
-            }
-            self.buffer_user_patch_operation(job, patch_operation)
         except docker.errors.APIError as e:
             logging.error(
                 'Get "{}" service raised '.format(service_name)
@@ -152,12 +140,6 @@ class CheckJobsMixin:
             task_state = service_tasks[0].get('Status').get('State')
             if job.status == 'queued' and task_state != 'pending':
                 job.status = 'running'
-                patch_operation = {
-                    'op': 'replace',
-                    'path': '/jobs/{}/status'.format(job.id),
-                    'value': job.status
-                }
-                self.buffer_user_patch_operation(job, patch_operation)
             elif job.status == 'running' and task_state in ['complete', 'failed']:  # noqa
                 try:
                     service.remove()
@@ -178,26 +160,8 @@ class CheckJobsMixin:
                             db.session.add(job_result)
                             db.session.flush()
                             db.session.refresh(job_result)
-                            patch_operation = {
-                                'op': 'add',
-                                'path': '/jobs/{}/results/{}'.format(job.id, job_result.id),  # noqa
-                                'value': job_result.to_dict()
-                            }
-                            self.buffer_user_patch_operation(job, patch_operation)  # noqa
                     job.end_date = datetime.utcnow()
-                    patch_operation = {
-                        'op': 'replace',
-                        'path': '/jobs/{}/end_date'.format(job.id),
-                        'value': job.end_date.timestamp()
-                    }
-                    self.buffer_user_patch_operation(job, patch_operation)
                     job.status = task_state
-                    patch_operation = {
-                        'op': 'replace',
-                        'path': '/jobs/{}/status'.format(job.id),
-                        'value': job.status
-                    }
-                    self.buffer_user_patch_operation(job, patch_operation)
         finally:
             self.send_job_notification(job)
 
@@ -207,12 +171,6 @@ class CheckJobsMixin:
             service = self.docker.services.get(service_name)
         except docker.errors.NotFound:
             job.status = 'canceled'
-            patch_operation = {
-                'op': 'replace',
-                'path': '/jobs/{}/status'.format(job.id),
-                'value': job.status
-            }
-            self.buffer_user_patch_operation(job, patch_operation)
         except docker.errors.APIError as e:
             logging.error(
                 'Get "{}" service raised '.format(service_name)
-- 
GitLab