From f3f6612a575e9ef2107c942653ba93c00b927397 Mon Sep 17 00:00:00 2001 From: Patrick Jentsch <p.jentsch@uni-bielefeld.de> Date: Fri, 13 Nov 2020 13:33:32 +0100 Subject: [PATCH] daemon is now tasks --- web/app/models.py | 11 +-- web/app/services/views.py | 4 +- web/app/tasks/__init__.py | 7 ++ web/app/tasks/job_utils.py | 84 ++++++++++++------- .../tasks/email/notification.html.j2 | 2 +- .../templates/tasks/email/notification.txt.j2 | 2 +- 6 files changed, 67 insertions(+), 43 deletions(-) diff --git a/web/app/models.py b/web/app/models.py index 6af02a2e..fc12610d 100644 --- a/web/app/models.py +++ b/web/app/models.py @@ -4,7 +4,6 @@ from flask_login import UserMixin, AnonymousUserMixin from itsdangerous import BadSignature, TimedJSONWebSignatureSerializer from time import sleep from werkzeug.security import generate_password_hash, check_password_hash -from werkzeug.utils import secure_filename import xml.etree.ElementTree as ET from . import db, login_manager import logging @@ -180,7 +179,7 @@ class User(UserMixin, db.Model): def __init__(self, **kwargs): super(User, self).__init__(**kwargs) if self.role is None: - if self.email == current_app.config['ADMIN_EMAIL_ADRESS']: + if self.email == current_app.config['NOPAQUE_ADMIN']: self.role = Role.query.filter_by(name='Administrator').first() if self.role is None: self.role = Role.query.filter_by(default=True).first() @@ -340,8 +339,6 @@ class Job(db.Model): end_date = db.Column(db.DateTime()) mem_mb = db.Column(db.Integer) n_cores = db.Column(db.Integer) - # This is used for zip creation - secure_filename = db.Column(db.String(32)) service = db.Column(db.String(64)) ''' ' Service specific arguments as string list. @@ -367,12 +364,6 @@ class Job(db.Model): ''' return '<Job {}>'.format(self.title) - def create_secure_filename(self): - ''' - Takes the job.title string nad cratesa a secure filename from this. - ''' - self.secure_filename = secure_filename(self.title) - def delete(self): ''' Delete the job and its inputs and results from the database. diff --git a/web/app/services/views.py b/web/app/services/views.py index a6567985..57aaef91 100644 --- a/web/app/services/views.py +++ b/web/app/services/views.py @@ -50,8 +50,6 @@ def service(service): service=service, service_args=json.dumps(service_args), service_version=form.version.data, status='preparing', title=form.title.data) - if job.service != 'corpus_analysis': - job.create_secure_filename() db.session.add(job) db.session.commit() try: @@ -66,7 +64,7 @@ def service(service): else: for file in form.files.data: filename = secure_filename(file.filename) - job_input = JobInput(dir=job.path, filename=filename, job=job) + job_input = JobInput(filename=filename, job=job) file.save(job_input.path) db.session.add(job_input) job.status = 'submitted' diff --git a/web/app/tasks/__init__.py b/web/app/tasks/__init__.py index 9bd21af6..b496975c 100644 --- a/web/app/tasks/__init__.py +++ b/web/app/tasks/__init__.py @@ -21,9 +21,16 @@ def check_corpora(): def check_jobs(): + print('check_jobs()') jobs = Job.query.all() + print([job.status for job in jobs]) for job in filter(lambda job: job.status == 'submitted', jobs): + print('pre create_job_service({})'.format(job)) job_utils.create_job_service(job) for job in filter(lambda job: job.status in ['queued', 'running'], jobs): + print('pre checkout_job_service({})'.format(job)) job_utils.checkout_job_service(job) + for job in filter(lambda job: job.status == 'canceling', jobs): + print('pre remove_job_service({})'.format(job)) + job_utils.remove_job_service(job) db.session.commit() diff --git a/web/app/tasks/job_utils.py b/web/app/tasks/job_utils.py index 68db9507..d5958137 100644 --- a/web/app/tasks/job_utils.py +++ b/web/app/tasks/job_utils.py @@ -1,7 +1,8 @@ from datetime import datetime +from werkzeug.utils import secure_filename from . import docker_client -from .. import db -from ..email import create_message, send +from .. import db, mail +from ..email import create_message from ..models import JobResult import docker import logging @@ -10,11 +11,12 @@ import os def create_job_service(job): + print('create_job_service({})'.format(job)) cmd = '{} -i /files -o /files/output'.format(job.service) if job.service == 'file-setup': - cmd += ' -f {}'.format(job.secure_filename) + cmd += ' -f {}'.format(secure_filename(job.title)) cmd += ' --log-dir /files' - cmd += ' --zip [{}]_{}'.format(job.service, job.secure_filename) + cmd += ' --zip [{}]_{}'.format(job.service, secure_filename(job.title)) cmd += ' ' + ' '.join(json.loads(job.service_args)) service_kwargs = {'command': cmd, 'constraints': ['node.role==worker'], @@ -36,15 +38,18 @@ def create_job_service(job): logging.error('Create "{}" service raised '.format(service_kwargs['name']) # noqa + '[docker-APIError] The server returned an error. ' + 'Details: {}'.format(e)) + return else: job.status = 'queued' + finally: + print('Sending email to {}'.format(job.creator.email)) msg = create_message( job.creator.email, 'Status update for your Job "{}"'.format(job.title), 'tasks/email/notification', job=job ) - send(msg) + mail.send(msg) def checkout_job_service(job): @@ -60,10 +65,12 @@ def checkout_job_service(job): logging.error('Get "{}" service raised '.format(service_name) + '[docker-APIError] The server returned an error. ' + 'Details: {}'.format(e)) + return except docker.errors.InvalidVersion: logging.error('Get "{}" service raised '.format(service_name) + '[docker-InvalidVersion] One of the arguments is ' + 'not supported with the current API version.') + return else: service_tasks = service.tasks() if not service_tasks: @@ -71,31 +78,33 @@ def checkout_job_service(job): task_state = service_tasks[0].get('Status').get('State') if job.status == 'queued' and task_state != 'pending': job.status = 'running' - elif job.status == 'running' and task_state == 'complete': - service.remove() - job.end_date = datetime.utcnow() - job.status = task_state - if task_state == 'complete': - job_results_dir = os.path.join(job.path, 'output') - job_results = filter(lambda x: x.endswith('.zip'), - os.listdir(job_results_dir)) - for job_result in job_results: - job_result = JobResult(filename=job_result, job=job) - db.session.add(job_result) - elif job.status == 'running' and task_state == 'failed': - service.remove() - job.end_date = datetime.utcnow() - job.status = task_state + elif job.status == 'running' and task_state in ['complete', 'failed']: # noqa + try: + service.remove() + except docker.errors.APIError as e: + logging.error('Remove "{}" service raised '.format(service_name) # noqa + + '[docker-APIError] The server returned an error. ' # noqa + + 'Details: {}'.format(e)) + return + else: + job.end_date = datetime.utcnow() + job.status = task_state + if task_state == 'complete': + job_results_dir = os.path.join(job.path, 'output') + job_results = filter(lambda x: x.endswith('.zip'), + os.listdir(job_results_dir)) + for job_result in job_results: + job_result = JobResult(filename=job_result, job=job) + db.session.add(job_result) finally: - # TODO: send email + print('Sending email to {}'.format(job.creator.email)) msg = create_message( job.creator.email, - '[nopaque] Status update for your Job "{}"'.format(job.title), + 'Status update for your Job "{}"'.format(job.title), 'tasks/email/notification', job=job ) - send(msg) - pass + mail.send(msg) def remove_job_service(job): @@ -103,9 +112,28 @@ def remove_job_service(job): try: service = docker_client.services.get(service_name) except docker.errors.NotFound: - # TODO: send email job.status = 'canceled' - # TODO: handle docker.errors.APIError and docker.errors.InvalidVersion + except docker.errors.APIError as e: + logging.error('Get "{}" service raised '.format(service_name) + + '[docker-APIError] The server returned an error. ' + + 'Details: {}'.format(e)) + return + except docker.errors.InvalidVersion: + logging.error('Get "{}" service raised '.format(service_name) + + '[docker-InvalidVersion] One of the arguments is ' + + 'not supported with the current API version.') + return else: - service.update(mounts=None) - service.remove() + try: + service.update(mounts=None) + except docker.errors.APIError as e: + logging.error('Update "{}" service raised '.format(service_name) # noqa + + '[docker-APIError] The server returned an error. ' # noqa + + 'Details: {}'.format(e)) + return + try: + service.remove() + except docker.errors.APIError as e: + logging.error('Remove "{}" service raised '.format(service_name) # noqa + + '[docker-APIError] The server returned an error. ' # noqa + + 'Details: {}'.format(e)) diff --git a/web/app/templates/tasks/email/notification.html.j2 b/web/app/templates/tasks/email/notification.html.j2 index 1aac0bf7..4e25ab4e 100644 --- a/web/app/templates/tasks/email/notification.html.j2 +++ b/web/app/templates/tasks/email/notification.html.j2 @@ -3,6 +3,6 @@ <p>The status of your Job "<b>{{ job.title }}</b>" has changed!</p> <p>It is now <b>{{ job.status }}</b>!</p> -<p>You can access your Job here: <a href="{{ url_for('jobs.job', job_id=job.id) }}">{{ url_for('jobs.job', job_id=job.id) }}</a></p> +<p>You can access your Job here: </p> <p>Kind regards!<br>Your nopaque team</p> diff --git a/web/app/templates/tasks/email/notification.txt.j2 b/web/app/templates/tasks/email/notification.txt.j2 index 03012b3e..7634ee3a 100644 --- a/web/app/templates/tasks/email/notification.txt.j2 +++ b/web/app/templates/tasks/email/notification.txt.j2 @@ -3,7 +3,7 @@ Dear {{ job.creator.username }}, The status of your Job "{{ job.title }}" has changed! It is now {{ job.status }}! -You can access your Job here: {{ url_for('jobs.job', job_id=job.id) }} +You can access your Job here: Kind regards! Your nopaque team -- GitLab