From bc5c8ef074d9f280792c33e02d8dd2a325b021c5 Mon Sep 17 00:00:00 2001 From: Patrick Jentsch <p.jentsch@uni-bielefeld.de> Date: Fri, 10 Sep 2021 16:25:32 +0200 Subject: [PATCH] handle email job status updates with the sqlalchemy event handlers --- app/events/sqlalchemy.py | 96 +++++++++++++++------------------------- app/models.py | 36 +++++++++++++++ app/tasks/job_utils.py | 21 +-------- 3 files changed, 73 insertions(+), 80 deletions(-) diff --git a/app/events/sqlalchemy.py b/app/events/sqlalchemy.py index 27c8e233..50749c8b 100644 --- a/app/events/sqlalchemy.py +++ b/app/events/sqlalchemy.py @@ -1,6 +1,7 @@ from datetime import datetime -from .. import db, socketio -from ..models import Corpus, CorpusFile, Job, JobInput, JobResult +from .. import db, mail, socketio +from ..email import create_message +from ..models import Corpus, CorpusFile, Job, JobInput, JobResult, QueryResult ############################################################################### @@ -11,102 +12,77 @@ from ..models import Corpus, CorpusFile, Job, JobInput, JobResult @db.event.listens_for(Job, 'after_delete') @db.event.listens_for(JobInput, 'after_delete') @db.event.listens_for(JobResult, 'after_delete') +@db.event.listens_for(QueryResult, 'after_delete') def ressource_after_delete(mapper, connection, ressource): - if isinstance(ressource, Corpus): - user_id = ressource.creator.id - path = '/corpora/{}'.format(ressource.id) - elif isinstance(ressource, CorpusFile): - user_id = ressource.corpus.creator.id - path = '/corpora/{}/files/{}'.format(ressource.corpus.id, ressource.id) - elif isinstance(ressource, Job): - user_id = ressource.creator.id - path = '/jobs/{}'.format(ressource.id) - elif isinstance(ressource, JobInput): - user_id = ressource.job.creator.id - path = '/jobs/{}/inputs/{}'.format(ressource.job.id, ressource.id) - elif isinstance(ressource, JobResult): - user_id = ressource.job.creator.id - path = '/jobs/{}/results/{}'.format(ressource.job.id, ressource.id) - event = 'user_{}_patch'.format(user_id) - jsonpatch = [{'op': 'remove', 'path': path}] - room = 'user_{}'.format(user_id) + event = 'user_{}_patch'.format(ressource.user_id) + jsonpatch = [{'op': 'remove', 'path': ressource.jsonpatch_path}] + room = 'user_{}'.format(ressource.user_id) socketio.emit(event, jsonpatch, room=room) + @db.event.listens_for(Corpus, 'after_insert') @db.event.listens_for(CorpusFile, 'after_insert') @db.event.listens_for(Job, 'after_insert') @db.event.listens_for(JobInput, 'after_insert') @db.event.listens_for(JobResult, 'after_insert') +@db.event.listens_for(QueryResult, 'after_insert') def ressource_after_insert_handler(mapper, connection, ressource): - if isinstance(ressource, Corpus): - user_id = ressource.creator.id - path = '/corpora/{}'.format(ressource.id) - elif isinstance(ressource, CorpusFile): - user_id = ressource.corpus.creator.id - path = '/corpora/{}/files/{}'.format(ressource.corpus.id, ressource.id) - elif isinstance(ressource, Job): - user_id = ressource.creator.id - path = '/jobs/{}'.format(ressource.id) - elif isinstance(ressource, JobInput): - user_id = ressource.job.creator.id - path = '/jobs/{}/inputs/{}'.format(ressource.job.id, ressource.id) - elif isinstance(ressource, JobResult): - user_id = ressource.job.creator.id - path = '/jobs/{}/results/{}'.format(ressource.job.id, ressource.id) - event = 'user_{}_patch'.format(user_id) + event = 'user_{}_patch'.format(ressource.user_id) jsonpatch = [ { 'op': 'add', - 'path': path, + 'path': ressource.jsonpatch_path, 'value': ressource.to_dict(include_relationships=False) } ] - room = 'user_{}'.format(user_id) + room = 'user_{}'.format(ressource.user_id) socketio.emit(event, jsonpatch, room=room) + @db.event.listens_for(Corpus, 'after_update') @db.event.listens_for(CorpusFile, 'after_update') @db.event.listens_for(Job, 'after_update') @db.event.listens_for(JobInput, 'after_update') @db.event.listens_for(JobResult, 'after_update') +@db.event.listens_for(QueryResult, 'after_update') def ressource_after_update_handler(mapper, connection, ressource): - if isinstance(ressource, Corpus): - user_id = ressource.creator.id - base_path = '/corpora/{}'.format(ressource.id) - elif isinstance(ressource, CorpusFile): - user_id = ressource.corpus.creator.id - base_path = '/corpora/{}/files/{}'.format(ressource.corpus.id, - ressource.id) - elif isinstance(ressource, Job): - user_id = ressource.creator.id - base_path = '/jobs/{}'.format(ressource.id) - elif isinstance(ressource, JobInput): - user_id = ressource.job.creator.id - base_path = '/jobs/{}/inputs/{}'.format(ressource.job.id, ressource.id) - elif isinstance(ressource, JobResult): - user_id = ressource.job.creator.id - base_path = '/jobs/{}/results/{}'.format(ressource.job.id, - ressource.id) jsonpatch = [] for attr in db.inspect(ressource).attrs: - # We don't want to emit changes about relationship fields + # We don't want to handle changes in relationship fields + # TODO: Find a way to handle this without a hardcoded list if attr.key in ['files', 'inputs', 'results']: continue history = attr.load_history() if not history.has_changes(): continue new_value = history.added[0] - # DateTime attributes must be converted to a string + # In order to be JSON serializable, DateTime attributes must be + # converted to a string if isinstance(new_value, datetime): new_value = new_value.isoformat() jsonpatch.append( { 'op': 'replace', - 'path': '{}/{}'.format(base_path, attr.key), + 'path': '{}/{}'.format(ressource.jsonpatch_path, attr.key), 'value': new_value } ) + # Job status update notification if it changed and wanted by the user + if isinstance(ressource, Job) and attr.key == 'status': + if ressource.creator.setting_job_status_mail_notifications == 'none': # noqa + pass + elif (ressource.creator.setting_job_status_mail_notifications == 'end' # noqa + and ressource.status not in ['complete', 'failed']): + pass + else: + msg = create_message( + ressource.creator.email, + 'Status update for your Job "{}"'.format(ressource.title), + 'tasks/email/notification', + job=ressource + ) + mail.send(msg) if jsonpatch: - event = 'user_{}_patch'.format(user_id) - room = 'user_{}'.format(user_id) + event = 'user_{}_patch'.format(ressource.user_id) + room = 'user_{}'.format(ressource.user_id) socketio.emit(event, jsonpatch, room=room) diff --git a/app/models.py b/app/models.py index ba1f7955..1bae8060 100644 --- a/app/models.py +++ b/app/models.py @@ -292,6 +292,10 @@ class JobInput(db.Model): return url_for('jobs.download_job_input', job_id=self.job_id, job_input_id=self.id) + @property + def jsonpatch_path(self): + return '/jobs/{}/inputs/{}'.format(self.job_id, self.id) + @property def path(self): return os.path.join(self.job.path, self.filename) @@ -301,6 +305,10 @@ class JobInput(db.Model): return url_for('jobs.job', job_id=self.job_id, _anchor='job-{}-input-{}'.format(self.job_id, self.id)) + @property + def user_id(self): + return self.job.user_id + def __repr__(self): ''' String representation of the JobInput. For human readability. @@ -332,6 +340,10 @@ class JobResult(db.Model): return url_for('jobs.download_job_result', job_id=self.job_id, job_result_id=self.id) + @property + def jsonpatch_path(self): + return '/jobs/{}/results/{}'.format(self.job_id, self.id) + @property def path(self): return os.path.join(self.job.path, 'output', self.filename) @@ -341,6 +353,10 @@ class JobResult(db.Model): return url_for('jobs.job', job_id=self.job_id, _anchor='job-{}-result-{}'.format(self.job_id, self.id)) + @property + def user_id(self): + return self.job.user_id + def __repr__(self): ''' String representation of the JobResult. For human readability. @@ -383,6 +399,10 @@ class Job(db.Model): results = db.relationship('JobResult', backref='job', lazy='dynamic', cascade='save-update, merge, delete') + @property + def jsonpatch_path(self): + return '/jobs/{}'.format(self.id) + @property def path(self): return os.path.join(self.creator.path, 'jobs', str(self.id)) @@ -479,6 +499,10 @@ class CorpusFile(db.Model): return url_for('corpora.download_corpus_file', corpus_id=self.corpus_id, corpus_file_id=self.id) + @property + def jsonpatch_path(self): + return '/corpora/{}/files/{}'.format(self.corpus_id, self.id) + @property def path(self): return os.path.join(self.corpus.path, self.filename) @@ -488,6 +512,10 @@ class CorpusFile(db.Model): return url_for('corpora.corpus_file', corpus_id=self.corpus_id, corpus_file_id=self.id) + @property + def user_id(self): + return self.corpus.user_id + def delete(self): try: os.remove(self.path) @@ -543,6 +571,10 @@ class Corpus(db.Model): def analysis_url(self): return url_for('corpora.analyse_corpus', corpus_id=self.id) + @property + def jsonpatch_path(self): + return '/corpora/{}'.format(self.id) + @property def path(self): return os.path.join(self.creator.path, 'corpora', str(self.id)) @@ -632,6 +664,10 @@ class QueryResult(db.Model): return url_for('corpora.download_query_result', query_result_id=self.id) + @property + def jsonpatch_path(self): + return '/query_results/{}'.format(self.id) + @property def path(self): return os.path.join( diff --git a/app/tasks/job_utils.py b/app/tasks/job_utils.py index f1a2b6a8..2c374d39 100644 --- a/app/tasks/job_utils.py +++ b/app/tasks/job_utils.py @@ -1,8 +1,7 @@ from datetime import datetime from flask import current_app from werkzeug.utils import secure_filename -from .. import db, mail -from ..email import create_message +from .. import db from ..models import Job, JobResult import docker import json @@ -105,8 +104,6 @@ class CheckJobsMixin: return else: job.status = 'queued' - finally: - self.send_job_notification(job) def checkout_job_service(self, job): service_name = 'job_{}'.format(job.id) @@ -162,8 +159,6 @@ class CheckJobsMixin: db.session.refresh(job_result) job.end_date = datetime.utcnow() job.status = task_state - finally: - self.send_job_notification(job) def remove_job_service(self, job): service_name = 'job_{}'.format(job.id) @@ -203,17 +198,3 @@ class CheckJobsMixin: + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) - - def send_job_notification(self, job): - if job.creator.setting_job_status_mail_notifications == 'none': - return - if (job.creator.setting_job_status_mail_notifications == 'end' - and job.status not in ['complete', 'failed']): - return - msg = create_message( - job.creator.email, - 'Status update for your Job "{}"'.format(job.title), - 'tasks/email/notification', - job=job - ) - mail.send(msg) -- GitLab