Skip to content
Snippets Groups Projects
Commit 99c43cc1 authored by Patrick Jentsch's avatar Patrick Jentsch
Browse files

some style updates

parent 575dc452
No related branches found
No related tags found
No related merge requests found
from datetime import datetime
from werkzeug.utils import secure_filename
from .. import db, mail
from ..email import create_message
from ..models import Job, JobResult
import docker
import logging
import json
import os
DOCKER_REGISTRY = 'gitlab.ub.uni-bielefeld.de:4567'
class CheckJobsMixin:
def check_jobs(self):
jobs = Job.query.all()
canceling_jobs = list(filter(lambda job: job.status == 'canceling', jobs)) # noqa
queued_jobs = list(filter(lambda job: job.status == 'queued', jobs))
running_jobs = list(filter(lambda job: job.status == 'running', jobs))
submitted_jobs = list(filter(lambda job: job.status == 'submitted', jobs)) # noqa
for job in submitted_jobs:
self.create_job_service(job)
for job in queued_jobs + running_jobs:
self.checkout_job_service(job)
for job in canceling_jobs:
self.remove_job_service(job)
def create_job_service(self, job):
if job.service == 'file-setup':
mem_mb = 2048
n_cores = 2
executable = 'file-setup'
image = '{}/sfb1288inf/file-setup:{}'.format(DOCKER_REGISTRY, job.service_version) # noqa
elif job.service == 'ocr':
mem_mb = 4096
n_cores = 4
executable = 'ocr'
image = '{}/sfb1288inf/ocr:{}'.format(DOCKER_REGISTRY, job.service_version) # noqa
elif job.service == 'nlp':
mem_mb = 2048
n_cores = 2
executable = 'nlp'
image = '{}/sfb1288inf/nlp:{}'.format(DOCKER_REGISTRY, job.service_version) # noqa
# Command
command = '{} -i /input -o /output'.format(executable)
command += ' --log-dir /input'
command += ' --mem-mb {}'.format(mem_mb)
command += ' --n-cores {}'.format(n_cores)
command += ' --zip [{}]_{}'.format(job.service, secure_filename(job.title))
command += ' ' + ' '.join(json.loads(job.service_args))
# Constraints
constraints = ['node.role==worker']
# Labels
labels = {'origin': 'nopaque', 'type': 'job', 'job_id': str(job.id)}
# Mounts
## Input mount
input_mount_source = job.path
input_mount_target = os.path.abspath('/input')
if job.service == 'file-setup':
input_mount_target = os.path.join(input_mount_target, secure_filename(job.title)) # noqa
input_mount = '{}:{}:rw'.format(input_mount_source, input_mount_target)
## Output mount
output_mount_source = os.path.join(job.path, 'output')
output_mount_target = os.path.abspath('/output')
output_mount = '{}:{}:rw'.format(output_mount_source, output_mount_target) # noqa
os.makedirs(output_mount_src)
mounts = [input_mount, output_mount]
# Name
name = 'job_{}'.format(job.id)
# Ressources
ressources = docker.types.Resources(
cpu_reservation=n_cores * (10 ** 9),
mem_reservation=mem_mb * (10 ** 6)
)
# Restart policy
restart_policy = docker.types.RestartPolicy()
try:
self.docker.services.create(
image,
command=command,
constraints=constraints,
labels=labels,
mounts=mounts,
name=name,
ressources=ressources,
restart_policy=restart_policy
)
except docker.errors.APIError as e:
logging.error(
'Create "{}" service raised '.format(service_kwargs['name'])
+ '"docker.errors.APIError" The server returned an error. '
+ 'Details: {}'.format(e)
)
return
else:
job.status = 'queued'
patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa
self.buffer_user_patch_operation(job, patch_operation)
finally:
self.send_job_notification(job)
def checkout_job_service(self, job):
service_name = 'job_{}'.format(job.id)
try:
service = self.docker.services.get(service_name)
except docker.errors.NotFound:
logging.error('Get "{}" service raised '.format(service_name)
+ '"docker.errors.NotFound" The service does not exist. '
+ '(job.status: {} -> failed)'.format(job.status))
job.status = 'failed'
patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa
self.buffer_user_patch_operation(job, patch_operation)
except docker.errors.APIError as e:
logging.error(
'Get "{}" service raised '.format(service_name)
+ '"docker.errors.APIError" The server returned an error. '
+ 'Details: {}'.format(e)
)
return
except docker.errors.InvalidVersion:
logging.error(
'Get "{}" service raised '.format(service_name)
+ '"docker.errors.InvalidVersion" One of the arguments is '
+ 'not supported with the current API version.'
)
return
else:
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if job.status == 'queued' and task_state != 'pending':
job.status = 'running'
patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa
self.buffer_user_patch_operation(job, patch_operation)
elif job.status == 'running' and task_state in ['complete', 'failed']:
try:
service.remove()
except docker.errors.APIError as e:
logging.error(
'Remove "{}" service raised '.format(service_name)
+ '"docker.errors.APIError" The server returned an error. ' # noqa
+ 'Details: {}'.format(e)
)
return
else:
if task_state == 'complete':
results_dir = os.path.join(job.path, 'output')
result_files = filter(lambda x: x.endswith('.zip'),
os.listdir(results_dir))
for result_file in result_files:
job_result = JobResult(filename=result_file, job=job) # noqa
db.session.add(job_result)
db.session.flush()
db.session.refresh(job_result)
patch_operation = {'op': 'add', 'path': '/jobs/{}/results/{}'.format(job.id, job_result.id), 'value': job_result.to_dict()} # noqa
self.buffer_user_patch_operation(job, patch_operation) # noqa
job.end_date = datetime.utcnow()
patch_operation = {'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp()} # noqa
self.buffer_user_patch_operation(job, patch_operation)
job.status = task_state
patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa
self.buffer_user_patch_operation(job, patch_operation)
finally:
self.send_job_notification(job)
def remove_job_service(self, job):
service_name = 'job_{}'.format(job.id)
try:
service = self.docker.services.get(service_name)
except docker.errors.NotFound:
job.status = 'canceled'
patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa
self.buffer_user_patch_operation(job, patch_operation)
except docker.errors.APIError as e:
logging.error(
'Get "{}" service raised '.format(service_name)
+ '"docker.errors.APIError" The server returned an error. '
+ 'Details: {}'.format(e)
)
return
except docker.errors.InvalidVersion:
logging.error(
'Get "{}" service raised '.format(service_name)
+ '"docker.errors.InvalidVersion" One of the arguments is '
+ 'not supported with the current API version.'
)
return
else:
try:
service.update(mounts=None)
except docker.errors.APIError as e:
logging.error(
'Update "{}" service raised '.format(service_name)
+ '"docker.errors.APIError" The server returned an error. '
+ 'Details: {}'.format(e)
)
return
try:
service.remove()
except docker.errors.APIError as e:
logging.error(
'Remove "{}" service raised '.format(service_name)
+ '"docker.errors.APIError" The server returned an error. '
+ 'Details: {}'.format(e)
)
def send_job_notification(self, job):
if job.creator.setting_job_status_mail_notifications == 'none':
return
if (job.creator.setting_job_status_mail_notifications == 'end'
and job.status not in ['complete', 'failed']):
return
msg = create_message(job.creator.email,
'Status update for your Job "{}"'.format(job.title), # noqa
'tasks/email/notification', job=job)
mail.send(msg)
...@@ -254,7 +254,7 @@ class CheckJobsMixin: ...@@ -254,7 +254,7 @@ class CheckJobsMixin:
return return
msg = create_message( msg = create_message(
job.creator.email, job.creator.email,
'Status update for your Job "{}"'.format(job.title), # noqa 'Status update for your Job "{}"'.format(job.title),
'tasks/email/notification', 'tasks/email/notification',
job=job job=job
) )
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment