Skip to content
Snippets Groups Projects
Commit f3f6612a authored by Patrick Jentsch's avatar Patrick Jentsch
Browse files

daemon is now tasks

parent 5a06a6b2
No related branches found
No related tags found
No related merge requests found
......@@ -4,7 +4,6 @@ from flask_login import UserMixin, AnonymousUserMixin
from itsdangerous import BadSignature, TimedJSONWebSignatureSerializer
from time import sleep
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
import xml.etree.ElementTree as ET
from . import db, login_manager
import logging
......@@ -180,7 +179,7 @@ class User(UserMixin, db.Model):
def __init__(self, **kwargs):
super(User, self).__init__(**kwargs)
if self.role is None:
if self.email == current_app.config['ADMIN_EMAIL_ADRESS']:
if self.email == current_app.config['NOPAQUE_ADMIN']:
self.role = Role.query.filter_by(name='Administrator').first()
if self.role is None:
self.role = Role.query.filter_by(default=True).first()
......@@ -340,8 +339,6 @@ class Job(db.Model):
end_date = db.Column(db.DateTime())
mem_mb = db.Column(db.Integer)
n_cores = db.Column(db.Integer)
# This is used for zip creation
secure_filename = db.Column(db.String(32))
service = db.Column(db.String(64))
'''
' Service specific arguments as string list.
......@@ -367,12 +364,6 @@ class Job(db.Model):
'''
return '<Job {}>'.format(self.title)
def create_secure_filename(self):
'''
Takes the job.title string nad cratesa a secure filename from this.
'''
self.secure_filename = secure_filename(self.title)
def delete(self):
'''
Delete the job and its inputs and results from the database.
......
......@@ -50,8 +50,6 @@ def service(service):
service=service, service_args=json.dumps(service_args),
service_version=form.version.data,
status='preparing', title=form.title.data)
if job.service != 'corpus_analysis':
job.create_secure_filename()
db.session.add(job)
db.session.commit()
try:
......@@ -66,7 +64,7 @@ def service(service):
else:
for file in form.files.data:
filename = secure_filename(file.filename)
job_input = JobInput(dir=job.path, filename=filename, job=job)
job_input = JobInput(filename=filename, job=job)
file.save(job_input.path)
db.session.add(job_input)
job.status = 'submitted'
......
......@@ -21,9 +21,16 @@ def check_corpora():
def check_jobs():
print('check_jobs()')
jobs = Job.query.all()
print([job.status for job in jobs])
for job in filter(lambda job: job.status == 'submitted', jobs):
print('pre create_job_service({})'.format(job))
job_utils.create_job_service(job)
for job in filter(lambda job: job.status in ['queued', 'running'], jobs):
print('pre checkout_job_service({})'.format(job))
job_utils.checkout_job_service(job)
for job in filter(lambda job: job.status == 'canceling', jobs):
print('pre remove_job_service({})'.format(job))
job_utils.remove_job_service(job)
db.session.commit()
from datetime import datetime
from werkzeug.utils import secure_filename
from . import docker_client
from .. import db
from ..email import create_message, send
from .. import db, mail
from ..email import create_message
from ..models import JobResult
import docker
import logging
......@@ -10,11 +11,12 @@ import os
def create_job_service(job):
print('create_job_service({})'.format(job))
cmd = '{} -i /files -o /files/output'.format(job.service)
if job.service == 'file-setup':
cmd += ' -f {}'.format(job.secure_filename)
cmd += ' -f {}'.format(secure_filename(job.title))
cmd += ' --log-dir /files'
cmd += ' --zip [{}]_{}'.format(job.service, job.secure_filename)
cmd += ' --zip [{}]_{}'.format(job.service, secure_filename(job.title))
cmd += ' ' + ' '.join(json.loads(job.service_args))
service_kwargs = {'command': cmd,
'constraints': ['node.role==worker'],
......@@ -36,15 +38,18 @@ def create_job_service(job):
logging.error('Create "{}" service raised '.format(service_kwargs['name']) # noqa
+ '[docker-APIError] The server returned an error. '
+ 'Details: {}'.format(e))
return
else:
job.status = 'queued'
finally:
print('Sending email to {}'.format(job.creator.email))
msg = create_message(
job.creator.email,
'Status update for your Job "{}"'.format(job.title),
'tasks/email/notification',
job=job
)
send(msg)
mail.send(msg)
def checkout_job_service(job):
......@@ -60,10 +65,12 @@ def checkout_job_service(job):
logging.error('Get "{}" service raised '.format(service_name)
+ '[docker-APIError] The server returned an error. '
+ 'Details: {}'.format(e))
return
except docker.errors.InvalidVersion:
logging.error('Get "{}" service raised '.format(service_name)
+ '[docker-InvalidVersion] One of the arguments is '
+ 'not supported with the current API version.')
return
else:
service_tasks = service.tasks()
if not service_tasks:
......@@ -71,31 +78,33 @@ def checkout_job_service(job):
task_state = service_tasks[0].get('Status').get('State')
if job.status == 'queued' and task_state != 'pending':
job.status = 'running'
elif job.status == 'running' and task_state == 'complete':
service.remove()
job.end_date = datetime.utcnow()
job.status = task_state
if task_state == 'complete':
job_results_dir = os.path.join(job.path, 'output')
job_results = filter(lambda x: x.endswith('.zip'),
os.listdir(job_results_dir))
for job_result in job_results:
job_result = JobResult(filename=job_result, job=job)
db.session.add(job_result)
elif job.status == 'running' and task_state == 'failed':
service.remove()
job.end_date = datetime.utcnow()
job.status = task_state
elif job.status == 'running' and task_state in ['complete', 'failed']: # noqa
try:
service.remove()
except docker.errors.APIError as e:
logging.error('Remove "{}" service raised '.format(service_name) # noqa
+ '[docker-APIError] The server returned an error. ' # noqa
+ 'Details: {}'.format(e))
return
else:
job.end_date = datetime.utcnow()
job.status = task_state
if task_state == 'complete':
job_results_dir = os.path.join(job.path, 'output')
job_results = filter(lambda x: x.endswith('.zip'),
os.listdir(job_results_dir))
for job_result in job_results:
job_result = JobResult(filename=job_result, job=job)
db.session.add(job_result)
finally:
# TODO: send email
print('Sending email to {}'.format(job.creator.email))
msg = create_message(
job.creator.email,
'[nopaque] Status update for your Job "{}"'.format(job.title),
'Status update for your Job "{}"'.format(job.title),
'tasks/email/notification',
job=job
)
send(msg)
pass
mail.send(msg)
def remove_job_service(job):
......@@ -103,9 +112,28 @@ def remove_job_service(job):
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
# TODO: send email
job.status = 'canceled'
# TODO: handle docker.errors.APIError and docker.errors.InvalidVersion
except docker.errors.APIError as e:
logging.error('Get "{}" service raised '.format(service_name)
+ '[docker-APIError] The server returned an error. '
+ 'Details: {}'.format(e))
return
except docker.errors.InvalidVersion:
logging.error('Get "{}" service raised '.format(service_name)
+ '[docker-InvalidVersion] One of the arguments is '
+ 'not supported with the current API version.')
return
else:
service.update(mounts=None)
service.remove()
try:
service.update(mounts=None)
except docker.errors.APIError as e:
logging.error('Update "{}" service raised '.format(service_name) # noqa
+ '[docker-APIError] The server returned an error. ' # noqa
+ 'Details: {}'.format(e))
return
try:
service.remove()
except docker.errors.APIError as e:
logging.error('Remove "{}" service raised '.format(service_name) # noqa
+ '[docker-APIError] The server returned an error. ' # noqa
+ 'Details: {}'.format(e))
......@@ -3,6 +3,6 @@
<p>The status of your Job "<b>{{ job.title }}</b>" has changed!</p>
<p>It is now <b>{{ job.status }}</b>!</p>
<p>You can access your Job here: <a href="{{ url_for('jobs.job', job_id=job.id) }}">{{ url_for('jobs.job', job_id=job.id) }}</a></p>
<p>You can access your Job here: </p>
<p>Kind regards!<br>Your nopaque team</p>
......@@ -3,7 +3,7 @@ Dear {{ job.creator.username }},
The status of your Job "{{ job.title }}" has changed!
It is now {{ job.status }}!
You can access your Job here: {{ url_for('jobs.job', job_id=job.id) }}
You can access your Job here:
Kind regards!
Your nopaque team
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment