Commit c7b2c413 authored by Patrick Jentsch's avatar Patrick Jentsch
Browse files

Merge branch 'development' of gitlab.ub.uni-bielefeld.de:sfb1288inf/nopaque into development

parents fd5fcf98 1a59b191
......@@ -32,7 +32,8 @@ def create_app(config_name):
app, message_queue=app.config['NOPAQUE_SOCKETIO_MESSAGE_QUEUE_URI'])
with app.app_context():
from . import events
from . import socketio_events
from . import sqlalchemy_events
from .admin import admin as admin_blueprint
from .auth import auth as auth_blueprint
from .corpora import corpora as corpora_blueprint
......
......@@ -4,7 +4,7 @@ from flask_login import current_user
from socket import gaierror
from .. import db, socketio
from ..decorators import socketio_login_required
from ..events import socketio_sessions
from ..socketio_events import socketio_sessions
from ..models import Corpus
import cqi
import math
......
from .. import db, socketio
from .. import db
from ..decorators import background
from ..models import Corpus, CorpusFile, QueryResult
......@@ -12,11 +12,6 @@ def build_corpus(corpus_id, *args, **kwargs):
raise Exception('Corpus {} not found'.format(corpus_id))
corpus.build()
db.session.commit()
event = 'user_{}_patch'.format(corpus.user_id)
jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/last_edited_date'.format(corpus.id), 'value': corpus.last_edited_date.timestamp()}, # noqa
{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}] # noqa
room = 'user_{}'.format(corpus.user_id)
socketio.emit(event, jsonpatch, room=room)
@background
......@@ -25,12 +20,8 @@ def delete_corpus(corpus_id, *args, **kwargs):
corpus = Corpus.query.get(corpus_id)
if corpus is None:
raise Exception('Corpus {} not found'.format(corpus_id))
event = 'user_{}_patch'.format(corpus.user_id)
jsonpatch = [{'op': 'remove', 'path': '/corpora/{}'.format(corpus.id)}]
room = 'user_{}'.format(corpus.user_id)
corpus.delete()
db.session.commit()
socketio.emit(event, jsonpatch, room=room)
@background
......@@ -39,13 +30,8 @@ def delete_corpus_file(corpus_file_id, *args, **kwargs):
corpus_file = CorpusFile.query.get(corpus_file_id)
if corpus_file is None:
raise Exception('Corpus file {} not found'.format(corpus_file_id))
event = 'user_{}_patch'.format(corpus_file.corpus.user_id)
jsonpatch = [{'op': 'remove', 'path': '/corpora/{}/files/{}'.format(corpus_file.corpus_id, corpus_file.id)}, # noqa
{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus_file.corpus_id), 'value': corpus_file.corpus.status}] # noqa
room = 'user_{}'.format(corpus_file.corpus.user_id)
corpus_file.delete()
db.session.commit()
socketio.emit(event, jsonpatch, room=room)
@background
......@@ -54,9 +40,5 @@ def delete_query_result(query_result_id, *args, **kwargs):
query_result = QueryResult.query.get(query_result_id)
if query_result is None:
raise Exception('QueryResult {} not found'.format(query_result_id))
event = 'user_{}_patch'.format(query_result.user_id)
jsonpatch = [{'op': 'remove', 'path': '/query_results/{}'.format(query_result.id)}] # noqa
room = 'user_{}'.format(query_result.user_id)
query_result.delete()
db.session.commit()
socketio.emit(event, jsonpatch, room=room)
......@@ -8,7 +8,7 @@ from .forms import (AddCorpusFileForm, AddCorpusForm, AddQueryResultForm,
DisplayOptionsForm, InspectDisplayOptionsForm,
ImportCorpusForm)
from jsonschema import validate
from .. import db, socketio
from .. import db
from ..models import Corpus, CorpusFile, QueryResult
import json
import logging
......@@ -40,10 +40,6 @@ def add_corpus():
else:
db.session.commit()
flash('Corpus "{}" added!'.format(corpus.title), 'corpus')
event = 'user_{}_patch'.format(corpus.user_id)
jsonpatch = [{'op': 'add', 'path': '/corpora/{}'.format(corpus.id), 'value': corpus.to_dict()}] # noqa
room = 'user_{}'.format(corpus.user_id)
socketio.emit(event, jsonpatch, room=room)
return redirect(url_for('.corpus', corpus_id=corpus.id))
return render_template('corpora/add_corpus.html.j2', form=form,
title='Add corpus')
......@@ -106,10 +102,6 @@ def import_corpus():
db.session.commit()
os.remove(archive_file)
flash('Corpus "{}" imported!'.format(corpus.title), 'corpus')
event = 'user_{}_patch'.format(corpus.user_id)
jsonpatch = [{'op': 'add', 'path': '/corpora/{}'.format(corpus.id), 'value': corpus.to_dict()}] # noqa
room = 'user_{}'.format(corpus.user_id)
socketio.emit(event, jsonpatch, room=room)
return make_response(
{'redirect_url': url_for('.corpus', corpus_id=corpus.id)}, 201)
else:
......@@ -212,11 +204,6 @@ def add_corpus_file(corpus_id):
corpus.status = 'unprepared'
db.session.commit()
flash('Corpus file "{}" added!'.format(corpus_file.filename), 'corpus')
event = 'user_{}_patch'.format(corpus.user_id)
jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}, # noqa
{'op': 'add', 'path': '/corpora/{}/files/{}'.format(corpus.id, corpus_file.id), 'value': corpus_file.to_dict()}] # noqa
room = 'user_{}'.format(corpus.user_id)
socketio.emit(event, jsonpatch, room=room)
return make_response({'redirect_url': url_for('.corpus', corpus_id=corpus.id)}, 201) # noqa
return render_template('corpora/add_corpus_file.html.j2', corpus=corpus,
form=form, title='Add corpus file')
......@@ -356,10 +343,6 @@ def add_query_result():
query_result_file_content.pop('cpos_lookup')
query_result.query_metadata = query_result_file_content
db.session.commit()
event = 'user_{}_patch'.format(query_result.user_id)
jsonpatch = [{'op': 'add', 'path': '/query_results/{}'.format(query_result.id), 'value': query_result.to_dict()}] # noqa
room = 'user_{}'.format(query_result.user_id)
socketio.emit(event, jsonpatch, room=room)
flash('Query result added!', 'result')
return make_response({'redirect_url': url_for('.query_result', query_result_id=query_result.id)}, 201) # noqa
return render_template('corpora/query_results/add_query_result.html.j2',
......
from .. import db, socketio
from .. import db
from ..decorators import background
from ..models import Job
......@@ -9,12 +9,8 @@ def delete_job(job_id, *args, **kwargs):
job = Job.query.get(job_id)
if job is None:
raise Exception('Job {} not found'.format(job_id))
event = 'user_{}_patch'.format(job.user_id)
jsonpatch = [{'op': 'remove', 'path': '/jobs/{}'.format(job.id)}]
room = 'user_{}'.format(job.user_id)
job.delete()
db.session.commit()
socketio.emit(event, jsonpatch, room=room)
@background
......@@ -29,8 +25,3 @@ def restart_job(job_id, *args, **kwargs):
pass
else:
db.session.commit()
event = 'user_{}_patch'.format(job.user_id)
jsonpatch = [{'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp()}, # noqa
{'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}] # noqa
room = 'user_{}'.format(job.user_id)
socketio.emit(event, jsonpatch, room=room)
......@@ -151,25 +151,31 @@ class User(UserMixin, db.Model):
def password(self, password):
self.password_hash = generate_password_hash(password)
def to_dict(self):
return {'id': self.id,
'role_id': self.role_id,
'confirmed': self.confirmed,
'email': self.email,
'last_seen': self.last_seen.timestamp(),
'member_since': self.member_since.timestamp(),
'settings': {'dark_mode': self.setting_dark_mode,
'job_status_mail_notifications':
self.setting_job_status_mail_notifications,
'job_status_site_notifications':
self.setting_job_status_site_notifications},
'username': self.username,
'corpora': {corpus.id: corpus.to_dict()
for corpus in self.corpora},
'jobs': {job.id: job.to_dict() for job in self.jobs},
'query_results': {query_result.id: query_result.to_dict()
for query_result in self.query_results},
'role': self.role.to_dict()}
def to_dict(self, include_relationships=True):
dict_user = {
'id': self.id,
'role_id': self.role_id,
'confirmed': self.confirmed,
'email': self.email,
'last_seen': self.last_seen.timestamp(),
'member_since': self.member_since.timestamp(),
'settings': {'dark_mode': self.setting_dark_mode,
'job_status_mail_notifications':
self.setting_job_status_mail_notifications,
'job_status_site_notifications':
self.setting_job_status_site_notifications},
'username': self.username,
'role': self.role.to_dict()
}
if include_relationships:
dict_user['corpora'] = {corpus.id: corpus.to_dict()
for corpus in self.corpora}
dict_user['jobs'] = {job.id: job.to_dict() for job in self.jobs}
dict_user['query_results'] = {
query_result.id: query_result.to_dict()
for query_result in self.query_results
}
return dict_user
def __repr__(self):
'''
......@@ -301,7 +307,7 @@ class JobInput(db.Model):
'''
return '<JobInput {}>'.format(self.filename)
def to_dict(self):
def to_dict(self, include_relationships=True):
return {'download_url': self.download_url,
'url': self.url,
'id': self.id,
......@@ -341,7 +347,7 @@ class JobResult(db.Model):
'''
return '<JobResult {}>'.format(self.filename)
def to_dict(self):
def to_dict(self, include_relationships=True):
return {'download_url': self.download_url,
'url': self.url,
'id': self.id,
......@@ -420,22 +426,26 @@ class Job(db.Model):
self.end_date = None
self.status = 'submitted'
def to_dict(self):
return {'url': self.url,
'id': self.id,
'user_id': self.user_id,
'creation_date': self.creation_date.timestamp(),
'description': self.description,
'end_date': (self.end_date.timestamp() if self.end_date else
None),
'service': self.service,
'service_args': self.service_args,
'service_version': self.service_version,
'status': self.status,
'title': self.title,
'inputs': {input.id: input.to_dict() for input in self.inputs},
'results': {result.id: result.to_dict()
for result in self.results}}
def to_dict(self, include_relationships=True):
dict_job = {
'url': self.url,
'id': self.id,
'user_id': self.user_id,
'creation_date': self.creation_date.timestamp(),
'description': self.description,
'end_date': self.end_date.timestamp() if self.end_date else None,
'service': self.service,
'service_args': self.service_args,
'service_version': self.service_version,
'status': self.status,
'title': self.title,
}
if include_relationships:
dict_job['inputs'] = {input.id: input.to_dict()
for input in self.inputs}
dict_job['results'] = {result.id: result.to_dict()
for result in self.results}
return dict_job
class CorpusFile(db.Model):
......@@ -485,7 +495,7 @@ class CorpusFile(db.Model):
db.session.delete(self)
self.corpus.status = 'unprepared'
def to_dict(self):
def to_dict(self, include_relationships=True):
return {'download_url': self.download_url,
'url': self.url,
'id': self.id,
......@@ -539,19 +549,24 @@ class Corpus(db.Model):
def url(self):
return url_for('corpora.corpus', corpus_id=self.id)
def to_dict(self):
return {'analysis_url': self.analysis_url,
'url': self.url,
'id': self.id,
'user_id': self.user_id,
'creation_date': self.creation_date.timestamp(),
'current_nr_of_tokens': self.current_nr_of_tokens,
'description': self.description,
'status': self.status,
'last_edited_date': self.last_edited_date.timestamp(),
'max_nr_of_tokens': self.max_nr_of_tokens,
'title': self.title,
'files': {file.id: file.to_dict() for file in self.files}}
def to_dict(self, include_relationships=True):
dict_corpus = {
'analysis_url': self.analysis_url,
'url': self.url,
'id': self.id,
'user_id': self.user_id,
'creation_date': self.creation_date.timestamp(),
'current_nr_of_tokens': self.current_nr_of_tokens,
'description': self.description,
'status': self.status,
'last_edited_date': self.last_edited_date.timestamp(),
'max_nr_of_tokens': self.max_nr_of_tokens,
'title': self.title,
}
if include_relationships:
dict_corpus['files'] = {file.id: file.to_dict()
for file in self.files}
return dict_corpus
def build(self):
output_dir = os.path.join(self.path, 'merged')
......@@ -628,7 +643,7 @@ class QueryResult(db.Model):
shutil.rmtree(self.path, ignore_errors=True)
db.session.delete(self)
def to_dict(self):
def to_dict(self, include_relationships=True):
return {'download_url': self.download_url,
'url': self.url,
'id': self.id,
......
......@@ -4,7 +4,7 @@ from flask_login import current_user, login_required
from werkzeug.utils import secure_filename
from . import services
from . import SERVICES
from .. import db, socketio
from .. import db
from .forms import AddJobForms
from ..models import Job, JobInput
import json
......@@ -69,10 +69,6 @@ def service(service):
job.status = 'submitted'
db.session.commit()
flash('Job "{}" added'.format(job.title), 'job')
event = 'user_{}_patch'.format(job.user_id)
jsonpatch = [{'op': 'add', 'path': '/jobs/{}'.format(job.id), 'value': job.to_dict()}] # noqa
room = 'user_{}'.format(job.user_id)
socketio.emit(event, jsonpatch, room=room)
return make_response(
{'redirect_url': url_for('jobs.job', job_id=job.id)}, 201)
return render_template('services/{}.html.j2'.format(service.replace('-', '_')),
......
......@@ -6,9 +6,6 @@ from .decorators import socketio_login_required
from .models import User
###############################################################################
# Socket.IO event handlers #
###############################################################################
'''
' A list containing session ids of connected Socket.IO sessions, to keep track
' of all connected sessions, which can be used to determine the runtimes of
......@@ -17,6 +14,9 @@ from .models import User
socketio_sessions = []
###############################################################################
# Socket.IO event handlers #
###############################################################################
@socketio.on('connect')
@socketio_login_required
def socketio_connect():
......
from datetime import datetime
from . import db, socketio
from .models import Corpus, CorpusFile, Job, JobInput, JobResult
###############################################################################
# SQLAlchemy event handlers #
###############################################################################
@db.event.listens_for(Corpus, 'after_delete')
@db.event.listens_for(CorpusFile, 'after_delete')
@db.event.listens_for(Job, 'after_delete')
@db.event.listens_for(JobInput, 'after_delete')
@db.event.listens_for(JobResult, 'after_delete')
def ressource_after_delete(mapper, connection, ressource):
if isinstance(ressource, Corpus):
user_id = ressource.creator.id
path = '/corpora/{}'.format(ressource.id)
elif isinstance(ressource, CorpusFile):
user_id = ressource.corpus.creator.id
path = '/corpora/{}/files/{}'.format(ressource.corpus.id, ressource.id)
elif isinstance(ressource, Job):
user_id = ressource.creator.id
path = '/jobs/{}'.format(ressource.id)
elif isinstance(ressource, JobInput):
user_id = ressource.job.creator.id
path = '/jobs/{}/inputs/{}'.format(ressource.job.id, ressource.id)
elif isinstance(ressource, JobResult):
user_id = ressource.job.creator.id
path = '/jobs/{}/results/{}'.format(ressource.job.id, ressource.id)
event = 'user_{}_patch'.format(user_id)
jsonpatch = [{'op': 'remove', 'path': path}]
room = 'user_{}'.format(user_id)
socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(Corpus, 'after_insert')
@db.event.listens_for(CorpusFile, 'after_insert')
@db.event.listens_for(Job, 'after_insert')
@db.event.listens_for(JobInput, 'after_insert')
@db.event.listens_for(JobResult, 'after_insert')
def ressource_after_insert_handler(mapper, connection, ressource):
if isinstance(ressource, Corpus):
user_id = ressource.creator.id
path = '/corpora/{}'.format(ressource.id)
elif isinstance(ressource, CorpusFile):
user_id = ressource.corpus.creator.id
path = '/corpora/{}/files/{}'.format(ressource.corpus.id, ressource.id)
elif isinstance(ressource, Job):
user_id = ressource.creator.id
path = '/jobs/{}'.format(ressource.id)
elif isinstance(ressource, JobInput):
user_id = ressource.job.creator.id
path = '/jobs/{}/inputs/{}'.format(ressource.job.id, ressource.id)
elif isinstance(ressource, JobResult):
user_id = ressource.job.creator.id
path = '/jobs/{}/results/{}'.format(ressource.job.id, ressource.id)
event = 'user_{}_patch'.format(user_id)
jsonpatch = [
{
'op': 'add',
'path': path,
'value': ressource.to_dict(include_relationships=False)
}
]
room = 'user_{}'.format(user_id)
socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(Corpus, 'after_update')
@db.event.listens_for(CorpusFile, 'after_update')
@db.event.listens_for(Job, 'after_update')
@db.event.listens_for(JobInput, 'after_update')
@db.event.listens_for(JobResult, 'after_update')
def ressource_after_update_handler(mapper, connection, ressource):
if isinstance(ressource, Corpus):
user_id = ressource.creator.id
base_path = '/corpora/{}'.format(ressource.id)
elif isinstance(ressource, CorpusFile):
user_id = ressource.corpus.creator.id
base_path = '/corpora/{}/files/{}'.format(ressource.corpus.id,
ressource.id)
elif isinstance(ressource, Job):
user_id = ressource.creator.id
base_path = '/jobs/{}'.format(ressource.id)
elif isinstance(ressource, JobInput):
user_id = ressource.job.creator.id
base_path = '/jobs/{}/inputs/{}'.format(ressource.job.id, ressource.id)
elif isinstance(ressource, JobResult):
user_id = ressource.job.creator.id
base_path = '/jobs/{}/results/{}'.format(ressource.job.id,
ressource.id)
jsonpatch = []
for attr in db.inspect(ressource).attrs:
# We don't want to emit changes about relationship fields
if attr.key in ['files', 'inputs', 'results']:
continue
history = attr.load_history()
if not history.has_changes():
continue
new_value = history.added[0]
# DateTime attributes must be converted to a timestamp
if isinstance(new_value, datetime):
new_value = new_value.timestamp()
jsonpatch.append(
{
'op': 'replace',
'path': '{}/{}'.format(base_path, attr.key),
'value': new_value
}
)
if jsonpatch:
event = 'user_{}_patch'.format(user_id)
room = 'user_{}'.format(user_id)
socketio.emit(event, jsonpatch, room=room)
......@@ -7,48 +7,8 @@ import docker
class TaskRunner(CheckCorporaMixin, CheckJobsMixin):
def __init__(self):
self.docker = docker.from_env()
self._socketio_message_buffer = {}
def run(self):
self.check_corpora()
self.check_jobs()
db.session.commit()
self.flush_socketio_messages()
def buffer_socketio_message(self, event, payload, room,
msg_id=None, override_policy='replace'):
if room not in self._socketio_message_buffer:
self._socketio_message_buffer[room] = {}
if event not in self._socketio_message_buffer[room]:
self._socketio_message_buffer[room][event] = {}
if msg_id is None:
msg_id = len(self._socketio_message_buffer[room][event].keys())
if override_policy == 'append':
if msg_id in self._socketio_message_buffer[room][event]:
# If the current message value isn't a list, convert it!
if not isinstance(self._socketio_message_buffer[room][event][msg_id], list): # noqa
self._socketio_message_buffer[room][event][msg_id] = [self._socketio_message_buffer[room][event][msg_id]] # noqa
else:
self._socketio_message_buffer[room][event][msg_id] = []
self._socketio_message_buffer[room][event][msg_id].append(payload)
elif override_policy == 'replace':
self._socketio_message_buffer[room][event][msg_id] = payload
else:
raise Exception('Unknown override policy: {}'.format(override_policy)) # noqa
return msg_id
def buffer_user_patch_operation(self, ressource, patch_operation):
self.buffer_socketio_message('user_{}_patch'.format(ressource.user_id),
patch_operation,
'user_{}'.format(ressource.user_id),
msg_id='_', override_policy='append')
def clear_socketio_message_buffer(self):
self._socketio_message_buffer = {}
def flush_socketio_messages(self):
for room in self._socketio_message_buffer:
for event in self._socketio_message_buffer[room]:
for message in self._socketio_message_buffer[room][event]:
socketio.emit(event, self._socketio_message_buffer[room][event][message], room=room) # noqa
self.clear_socketio_message_buffer()
......@@ -85,12 +85,6 @@ class CheckCorporaMixin:
)
else:
corpus.status = 'queued'
patch_operation = {
'op': 'replace',
'path': '/corpora/{}/status'.format(corpus.id),
'value': corpus.status
}
self.buffer_user_patch_operation(corpus, patch_operation)
def checkout_build_corpus_service(self, corpus):
service_name = 'build-corpus_{}'.format(corpus.id)
......@@ -103,12 +97,6 @@ class CheckCorporaMixin:
+ '(corpus.status: {} -> failed)'.format(corpus.status)
)
corpus.status = 'failed'
patch_operation = {
'op': 'replace',
'path': '/corpora/{}/status'.format(corpus.id),
'value': corpus.status
}
self.buffer_user_patch_operation(corpus, patch_operation)
except docker.errors.APIError as e:
logging.error(
'Get "{}" service raised '.format(service_name)
......@@ -128,12 +116,6 @@ class CheckCorporaMixin:
task_state = service_tasks[0].get('Status').get('State')
if corpus.status == 'queued' and task_state != 'pending':
corpus.status = 'running'
patch_operation = {
'op': 'replace',
'path': '/corpora/{}/status'.format(corpus.id),
'value': corpus.status
}
self.buffer_user_patch_operation(corpus, patch_operation)
elif (corpus.status == 'running'
and task_state in ['complete', 'failed']):
try:
......@@ -148,12 +130,6 @@ class CheckCorporaMixin:
else:
corpus.status = \
'prepared' if task_state == 'complete' else 'failed'
patch_operation = {
'op': 'replace',
'path': '/corpora/{}/status'.format(corpus.id),
'value': corpus.status
}
self.buffer_user_patch_operation(corpus, patch_operation)
def create_cqpserver_container(self, corpus):
''' # Docker container settings # '''
......@@ -214,12 +190,6 @@ class CheckCorporaMixin:
+ 'non-zero exit code and detach is False.'
)
corpus.status = 'failed'
patch_operation = {
'op': 'replace',
'path': '/corpora/{}/status'.format(corpus.id),
'value': corpus.status
}
self.buffer_user_patch_operation(corpus, patch_operation)
except docker.errors.ImageNotFound:
logging.error(
'Run "{}" container raised '.format(name)
......@@ -227,12 +197,6 @@ class CheckCorporaMixin:
+ 'exist.'
)
corpus.status = 'failed'
patch_operation = {
'op': 'replace',
'path': '/corpora/{}/status'.format(corpus.id),
'value': corpus.status
}
self.buffer_user_patch_operation(corpus, patch_operation)
except docker.errors.APIError as e:
logging.error(
'Run "{}" container raised '.format(name)
......@@ -241,12 +205,6 @@ class CheckCorporaMixin:
)
else:
corpus.status = 'analysing'