Skip to content
Snippets Groups Projects
job_utils.py 5.83 KiB
from datetime import datetime
from werkzeug.utils import secure_filename
from . import docker_client
from .. import db, mail
from ..email import create_message
from ..models import JobResult
import docker
import logging
import json
import os


def create_job_service(job):
    cmd = '{} -i /files -o /files/output'.format(job.service)
    if job.service == 'file-setup':
        cmd += ' -f {}'.format(secure_filename(job.title))
    cmd += ' --log-dir /files'
    cmd += ' --zip [{}]_{}'.format(job.service, secure_filename(job.title))
    cmd += ' ' + ' '.join(json.loads(job.service_args))
    service_kwargs = {'command': cmd,
                      'constraints': ['node.role==worker'],
                      'labels': {'origin': 'nopaque',
                                 'type': 'service.{}'.format(job.service),
                                 'job_id': str(job.id)},
                      'mounts': [job.path + ':/files:rw'],
                      'name': 'job_{}'.format(job.id),
                      'resources': docker.types.Resources(
                          cpu_reservation=job.n_cores * (10 ** 9),
                          mem_reservation=job.mem_mb * (10 ** 6)
                      ),
                      'restart_policy': docker.types.RestartPolicy()}
    service_image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(
        job.service, job.service_version)
    try:
        docker_client.services.create(service_image, **service_kwargs)
    except docker.errors.APIError as e:
        logging.error(
            'Create "{}" service raised '.format(service_kwargs['name'])
            + '"docker.errors.APIError" The server returned an error. '
            + 'Details: {}'.format(e)
        )
        return
    else:
        job.status = 'queued'
    finally:
        send_notification(job)


def checkout_job_service(job):
    service_name = 'job_{}'.format(job.id)
    try:
        service = docker_client.services.get(service_name)
    except docker.errors.NotFound:
        logging.error('Get "{}" service raised '.format(service_name)
                      + '"docker.errors.NotFound" The service does not exist. '
                      + '(job.status: {} -> failed)'.format(job.status))
        job.status = 'failed'
    except docker.errors.APIError as e:
        logging.error(
            'Get "{}" service raised '.format(service_name)
            + '"docker.errors.APIError" The server returned an error. '
            + 'Details: {}'.format(e)
        )
        return
    except docker.errors.InvalidVersion:
        logging.error(
            'Get "{}" service raised '.format(service_name)
            + '"docker.errors.InvalidVersion" One of the arguments is '
            + 'not supported with the current API version.'
        )
        return
    else:
        service_tasks = service.tasks()
        if not service_tasks:
            return
        task_state = service_tasks[0].get('Status').get('State')
        if job.status == 'queued' and task_state != 'pending':
            job.status = 'running'
        elif job.status == 'running' and task_state in ['complete', 'failed']:
            try:
                service.remove()
            except docker.errors.APIError as e:
                logging.error(
                    'Remove "{}" service raised '.format(service_name)
                    + '"docker.errors.APIError" The server returned an error. '
                    + 'Details: {}'.format(e)
                )
                return
            else:
                if task_state == 'complete':
                    job_results_dir = os.path.join(job.path, 'output')
                    job_results = filter(lambda x: x.endswith('.zip'),
                                         os.listdir(job_results_dir))
                    for job_result in job_results:
                        job_result = JobResult(filename=job_result, job=job)
                        db.session.add(job_result)
                job.end_date = datetime.utcnow()
                job.status = task_state
    finally:
        send_notification(job)


def remove_job_service(job):
    service_name = 'job_{}'.format(job.id)
    try:
        service = docker_client.services.get(service_name)
    except docker.errors.NotFound:
        job.status = 'canceled'
    except docker.errors.APIError as e:
        logging.error(
            'Get "{}" service raised '.format(service_name)
            + '"docker.errors.APIError" The server returned an error. '
            + 'Details: {}'.format(e)
        )
        return
    except docker.errors.InvalidVersion:
        logging.error(
            'Get "{}" service raised '.format(service_name)
            + '"docker.errors.InvalidVersion" One of the arguments is '
            + 'not supported with the current API version.'
        )
        return
    else:
        try:
            service.update(mounts=None)
        except docker.errors.APIError as e:
            logging.error(
                'Update "{}" service raised '.format(service_name)
                + '"docker.errors.APIError" The server returned an error. '
                + 'Details: {}'.format(e)
            )
            return
        try:
            service.remove()
        except docker.errors.APIError as e:
            logging.error(
                'Remove "{}" service raised '.format(service_name)
                + '"docker.errors.APIError" The server returned an error. '
                + 'Details: {}'.format(e)
            )


def send_notification(job):
    if job.creator.setting_job_status_mail_notifications == 'none':
        return
    if (job.creator.setting_job_status_mail_notifications == 'end'
            and job.status not in ['complete', 'failed']):
        return
    msg = create_message(job.creator.email,
                         'Status update for your Job "{}"'.format(job.title),
                         'tasks/email/notification', job=job)
    mail.send(msg)