Newer
Older
from datetime import datetime, timedelta
from enum import Enum, IntEnum
from flask_hashids import HashidMixin
from flask_login import UserMixin
from time import sleep

Patrick Jentsch
committed
from tqdm import tqdm
from werkzeug.security import generate_password_hash, check_password_hash

Patrick Jentsch
committed
import json

Patrick Jentsch
committed
import requests
import shutil

Patrick Jentsch
committed
import xml.etree.ElementTree as ET
import yaml
from app import db, hashids, login, mail, socketio
from app.converters.vrt import normalize_vrt_file
from app.email import create_message
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
##############################################################################
# enums #
##############################################################################
# region enums
class CorpusStatus(IntEnum):
UNPREPARED = 1
SUBMITTED = 2
QUEUED = 3
BUILDING = 4
BUILT = 5
FAILED = 6
STARTING_ANALYSIS_SESSION = 7
RUNNING_ANALYSIS_SESSION = 8
CANCELING_ANALYSIS_SESSION = 9
class JobStatus(IntEnum):
INITIALIZING = 1
SUBMITTED = 2
QUEUED = 3
RUNNING = 4
CANCELING = 5
CANCELED = 6
COMPLETED = 7
FAILED = 8
class Permission(IntEnum):
'''
Defines User permissions as integers by the power of 2. User permission
can be evaluated using the bitwise operator &.
'''
ADMINISTRATE = 1
CONTRIBUTE = 2
USE_API = 4
class UserSettingJobStatusMailNotificationLevel(IntEnum):
NONE = 1
END = 2
ALL = 3
class ProfilePrivacySettings(IntEnum):
SHOW_EMAIL = 1
SHOW_LAST_SEEN = 2
SHOW_MEMBER_SINCE = 4
# endregion enums
##############################################################################
# mixins #
##############################################################################
# region mixins
'''
Mixin for db.Model classes. All file related models should use this.
'''
creation_date = db.Column(db.DateTime, default=datetime.utcnow)

Patrick Jentsch
committed
filename = db.Column(db.String(255))
def file_mixin_to_json_serializeable(self, backrefs=False, relationships=False):
'creation_date': f'{self.creation_date.isoformat()}Z',
'filename': self.filename,
'mimetype': self.mimetype
}
@classmethod
def create(cls, file_storage, **kwargs):
filename = kwargs.pop('filename', file_storage.filename)
mimetype = kwargs.pop('mimetype', file_storage.mimetype)
obj = cls(
filename=secure_filename(filename),
mimetype=mimetype,
**kwargs
)
db.session.add(obj)
db.session.flush(objects=[obj])
db.session.refresh(obj)
try:
file_storage.save(obj.path)
except (AttributeError, OSError) as e:
current_app.logger.error(e)
db.session.rollback()
raise e
return obj
# endregion mixins
##############################################################################
# type_decorators #
##############################################################################
# region type_decorators
class IntEnumColumn(db.TypeDecorator):
impl = db.Integer
def __init__(self, enum_type, *args, **kwargs):
super().__init__(*args, **kwargs)
self.enum_type = enum_type
def process_bind_param(self, value, dialect):
if isinstance(value, self.enum_type) and isinstance(value.value, int):
return value.value
elif isinstance(value, int):
return self.enum_type(value).value
elif isinstance(value, str):
return self.enum_type[value].value
else:
return TypeError()
def process_result_value(self, value, dialect):
return self.enum_type(value)
class ContainerColumn(db.TypeDecorator):
impl = db.String
def __init__(self, container_type, *args, **kwargs):
super().__init__(*args, **kwargs)
self.container_type = container_type
def process_bind_param(self, value, dialect):
if isinstance(value, self.container_type):
return json.dumps(value)
elif isinstance(value, str) and isinstance(json.loads(value), self.container_type):
return value
else:
return TypeError()
def process_result_value(self, value, dialect):
return json.loads(value)
# endregion type_decorators
##############################################################################
# Models #
##############################################################################
# region models
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
default = db.Column(db.Boolean, default=False, index=True)
permissions = db.Column(db.Integer, default=0)
users = db.relationship('User', backref='role', lazy='dynamic')
def __repr__(self):
def add_permission(self, permission):
if not self.has_permission(permission):
self.permissions += permission
def has_permission(self, permission):
return self.permissions & permission == permission
def remove_permission(self, permission):
if self.has_permission(permission):
self.permissions -= permission
def reset_permissions(self):
self.permissions = 0
def to_json_serializeable(self, backrefs=False, relationships=False):
json_serializeable = {
'id': self.hashid,
'default': self.default,
'name': self.name,
'permissions': self.permissions
}
if relationships:
json_serializeable['users'] = {
x.hashid: x.to_json_serializeable(relationships=True)
@staticmethod

Patrick Jentsch
committed
def insert_defaults():
'API user': [Permission.USE_API],
'Contributor': [Permission.CONTRIBUTE],
'Administrator': [
Permission.ADMINISTRATE,
Permission.CONTRIBUTE,
Permission.USE_API
}
default_role_name = 'User'
for role_name, permissions in roles.items():
role = Role.query.filter_by(name=role_name).first()
if role is None:
role.reset_permissions()
for permission in permissions:
role.add_permission(permission)
role.default = role.name == default_role_name
db.session.add(role)
db.session.commit()
class Token(db.Model):
__tablename__ = 'tokens'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
# Fields
access_token = db.Column(db.String(64), index=True)
access_expiration = db.Column(db.DateTime)
refresh_token = db.Column(db.String(64), index=True)
refresh_expiration = db.Column(db.DateTime)
# Backrefs: user: User
def expire(self):
self.access_expiration = datetime.utcnow()
self.refresh_expiration = datetime.utcnow()
@staticmethod
def clean():
"""Remove any tokens that have been expired for more than a day."""
yesterday = datetime.utcnow() - timedelta(days=1)
Token.query.filter(Token.refresh_expiration < yesterday).delete()
class Avatar(HashidMixin, FileMixin, db.Model):
__tablename__ = 'avatars'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
@property
def path(self):
return os.path.join(self.user.path, 'avatar')
def delete(self):
try:
os.remove(self.path)
except OSError as e:
current_app.logger.error(e)
db.session.delete(self)
def to_json_serializeable(self, backrefs=False, relationships=False):
json_serializeable = {
corpus_followers = db.Table(
'corpus_followers',
db.Model.metadata,
db.Column('user_id', db.ForeignKey('users.id'), primary_key=True),
db.Column('corpus_id', db.ForeignKey('corpora.id'), primary_key=True)
)
user_followers = db.Table(
'user_followers',
db.Model.metadata,
db.Column('follower_user_id', db.ForeignKey('users.id'), primary_key=True),
db.Column('followed_user_id', db.ForeignKey('users.id'), primary_key=True)
)
class User(HashidMixin, UserMixin, db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
email = db.Column(db.String(254), index=True, unique=True)
username = db.Column(db.String(64), index=True, unique=True)
password_hash = db.Column(db.String(128))
member_since = db.Column(db.DateTime(), default=datetime.utcnow)
setting_job_status_mail_notification_level = db.Column(
IntEnumColumn(UserSettingJobStatusMailNotificationLevel),
default=UserSettingJobStatusMailNotificationLevel.END

Patrick Jentsch
committed
)
full_name = db.Column(db.String(64))
about_me = db.Column(db.String(256))
location = db.Column(db.String(64))
website = db.Column(db.String(128))
organization = db.Column(db.String(128))
is_public = db.Column(db.Boolean, default=False)
profile_privacy_settings = db.Column(db.Integer(), default=0)
avatar = db.relationship(
'Avatar',
backref='user',
cascade='all, delete-orphan',
uselist=False
)
tesseract_ocr_pipeline_models = db.relationship(
'TesseractOCRPipelineModel',

Patrick Jentsch
committed
backref='user',
cascade='all, delete-orphan',
lazy='dynamic'
)
spacy_nlp_pipeline_models = db.relationship(
'SpaCyNLPPipelineModel',
backref='user',
cascade='all, delete-orphan',
lazy='dynamic'
)
corpora = db.relationship(
'Corpus',
backref='user',
cascade='all, delete-orphan',
lazy='dynamic'
)
followed_corpora = db.relationship(
'Corpus',
secondary=corpus_followers,
primaryjoin=(corpus_followers.c.user_id == id),
backref=db.backref('followers', lazy='dynamic'),
lazy='dynamic'
)
followed_users = db.relationship(
'User',
secondary=user_followers,
primaryjoin=(user_followers.c.follower_user_id == id),
secondaryjoin=(user_followers.c.followed_user_id == id),
backref=db.backref('followers', lazy='dynamic'),
lazy='dynamic'
)
jobs = db.relationship(
'Job',
backref='user',
cascade='all, delete-orphan',
lazy='dynamic'
)
tokens = db.relationship(
'Token',
backref='user',
cascade='all, delete-orphan',
lazy='dynamic'
)
def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.role is not None:
return
if self.email == current_app.config['NOPAQUE_ADMIN']:
self.role = Role.query.filter_by(name='Administrator').first()
else:
self.role = Role.query.filter_by(default=True).first()
def __repr__(self):
return f'<User {self.username}>'

Patrick Jentsch
committed
@property
def jsonpatch_path(self):
return f'/users/{self.hashid}'

Patrick Jentsch
committed
@property
def password(self):
raise AttributeError('password is not a readable attribute')
@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)
@property
def path(self):
return os.path.join(

Patrick Jentsch
committed
current_app.config.get('NOPAQUE_DATA_DIR'), 'users', str(self.id))
@staticmethod
def create(**kwargs):
user = User(**kwargs)
db.session.add(user)
db.session.flush(objects=[user])
db.session.refresh(user)
try:
os.mkdir(user.path)
os.mkdir(os.path.join(user.path, 'spacy_nlp_pipeline_models'))
os.mkdir(os.path.join(user.path, 'tesseract_ocr_pipeline_models'))
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
os.mkdir(os.path.join(user.path, 'corpora'))
os.mkdir(os.path.join(user.path, 'jobs'))
except OSError as e:
current_app.logger.error(e)
db.session.rollback()
raise e
return user
@staticmethod
def insert_defaults():
nopaque_user = User.query.filter_by(username='nopaque').first()
system_user_role = Role.query.filter_by(name='System user').first()
if nopaque_user is None:
nopaque_user = User.create(
username='nopaque',
role=system_user_role
)
db.session.add(nopaque_user)
elif nopaque_user.role != system_user_role:
nopaque_user.role = system_user_role
db.session.commit()
@staticmethod
def reset_password(token, new_password):
try:
payload = jwt.decode(
token,
current_app.config['SECRET_KEY'],
algorithms=['HS256'],
issuer=current_app.config['SERVER_NAME'],
options={'require': ['exp', 'iat', 'iss', 'purpose', 'sub']}
)
except jwt.PyJWTError:
return False
if payload.get('purpose') != 'User.reset_password':
return False
user_hashid = payload.get('sub')
user_id = hashids.decode(user_hashid)
user = User.query.get(user_id)
if user is None:
return False
user.password = new_password
db.session.add(user)
return True
@staticmethod
def verify_access_token(access_token, refresh_token=None):
token = Token.query.filter(Token.access_token == access_token).first()
if token is not None:
if token.access_expiration > datetime.utcnow():
token.user.ping()
db.session.commit()
if token.user.role.name != 'System user':
return token.user
@staticmethod
def verify_refresh_token(refresh_token, access_token):
token = Token.query.filter((Token.refresh_token == refresh_token) & (Token.access_token == access_token)).first()
if token is not None:
if token.refresh_expiration > datetime.utcnow():
return token
# someone tried to refresh with an expired token
# revoke all tokens from this user as a precaution
token.user.revoke_auth_tokens()
db.session.commit()
def can(self, permission):
return self.role.has_permission(permission)
current_app.config['SECRET_KEY'],
algorithms=['HS256'],
issuer=current_app.config['SERVER_NAME'],
options={'require': ['exp', 'iat', 'iss', 'purpose', 'sub']}
)
return False
self.confirmed = True
db.session.add(self)
return True

Patrick Jentsch
committed
shutil.rmtree(self.path, ignore_errors=True)
def generate_auth_token(self):
return Token(
access_token=secrets.token_urlsafe(),
access_expiration=datetime.utcnow() + timedelta(minutes=15),
refresh_token=secrets.token_urlsafe(),
refresh_expiration=datetime.utcnow() + timedelta(days=7),
user=self
)
def generate_confirm_token(self, expiration=3600):
now = datetime.utcnow()
'exp': now + timedelta(seconds=expiration),
'iat': now,
'iss': current_app.config['SERVER_NAME'],
return jwt.encode(
payload,
current_app.config['SECRET_KEY'],
algorithm='HS256'
)
def generate_reset_password_token(self, expiration=3600):
now = datetime.utcnow()
'exp': now + timedelta(seconds=expiration),
'iat': now,
'iss': current_app.config['SERVER_NAME'],
return jwt.encode(
payload,
current_app.config['SECRET_KEY'],
algorithm='HS256'
)
def is_administrator(self):
return self.can(Permission.ADMINISTRATE)
def ping(self):
self.last_seen = datetime.utcnow()

Patrick Jentsch
committed
def revoke_auth_tokens(self):
for token in self.tokens:
db.session.delete(token)
def verify_password(self, password):
if self.role.name == 'System user':
return False
return check_password_hash(self.password_hash, password)
#region Profile Privacy settings
def has_profile_privacy_setting(self, setting):
return self.profile_privacy_settings & setting == setting
def add_profile_privacy_setting(self, setting):
if not self.has_profile_privacy_setting(setting):
self.profile_privacy_settings += setting
def remove_profile_privacy_setting(self, setting):
if self.has_profile_privacy_setting(setting):
self.profile_privacy_settings -= setting
def reset_profile_privacy_settings(self):
self.profile_privacy_settings = 0
#endregion Profile Privacy settings
def follow_corpus(self, corpus):
self.followed_corpora.append(corpus)
def unfollow_corpus(self, corpus):
self.followed_corpora.remove(corpus)
def is_following_corpus(self, corpus):
return self.followed_corpora.filter(
corpus_followers.c.corpus_id == corpus.id).count() > 0
def follow_user(self, user):
if not self.is_following_user(user):
self.followed_users.append(user)
def unfollow_user(self, user):
if self.is_following_user(user):
self.followed_users.remove(user)
def is_following_user(self, user):
return self.followed_users.filter(
user_followers.c.followed_user_id == user.id).count() > 0
def to_json_serializeable(self, backrefs=False, relationships=False, filter_by_privacy_settings=False):
'id': self.hashid,
'confirmed': self.confirmed,
'email': self.email,
else self.last_seen.strftime('%Y-%m-%d %H:%M')
'member_since': self.member_since.strftime('%Y-%m-%d'),
'full_name': self.full_name,
'about_me': self.about_me,
'website': self.website,
'location': self.location,
'organization': self.organization,
'job_status_mail_notification_level': \
self.setting_job_status_mail_notification_level.name,
'is_public': self.is_public,
'show_email': self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_EMAIL),
'show_last_seen': self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_LAST_SEEN),
'show_member_since': self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_MEMBER_SINCE)
json_serializeable['avatar'] = (
None if self.avatar is None
else self.avatar.to_json_serializeable(relationships=True)
)
json_serializeable['role'] = \
self.role.to_json_serializeable(backrefs=True)
json_serializeable['corpora'] = {
x.hashid: x.to_json_serializeable(relationships=True)
json_serializeable['jobs'] = {
x.hashid: x.to_json_serializeable(relationships=True)
json_serializeable['tesseract_ocr_pipeline_models'] = {
x.hashid: x.to_json_serializeable(relationships=True)
for x in self.tesseract_ocr_pipeline_models
json_serializeable['spacy_nlp_pipeline_models'] = {
x.hashid: x.to_json_serializeable(relationships=True)
for x in self.spacy_nlp_pipeline_models
}
if filter_by_privacy_settings:
if not self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_EMAIL):
json_serializeable.pop('email')
if not self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_LAST_SEEN):
json_serializeable.pop('last_seen')
if not self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_MEMBER_SINCE):
json_serializeable.pop('member_since')
class TesseractOCRPipelineModel(FileMixin, HashidMixin, db.Model):
__tablename__ = 'tesseract_ocr_pipeline_models'

Patrick Jentsch
committed
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
# Fields

Patrick Jentsch
committed
description = db.Column(db.String(255))
version = db.Column(db.String(16))
compatible_service_versions = db.Column(ContainerColumn(list, 255))

Patrick Jentsch
committed
publisher = db.Column(db.String(128))
publisher_url = db.Column(db.String(512))
publishing_url = db.Column(db.String(512))

Patrick Jentsch
committed
publishing_year = db.Column(db.Integer)
is_public = db.Column(db.Boolean, default=False)

Patrick Jentsch
committed
# Backrefs: user: User
@property
def path(self):
return os.path.join(
self.user.path,

Patrick Jentsch
committed
str(self.id)
)
@property
def jsonpatch_path(self):
return f'{self.user.jsonpatch_path}/tesseract_ocr_pipeline_models/{self.hashid}'
@property
def url(self):
return url_for(
'contributions.tesseract_ocr_pipeline_model',
tesseract_ocr_pipeline_model_id=self.id
)
@property
def user_hashid(self):
return self.user.hashid

Patrick Jentsch
committed
@staticmethod
def insert_defaults():
nopaque_user = User.query.filter_by(username='nopaque').first()

Patrick Jentsch
committed
defaults_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'TesseractOCRPipelineModel.defaults.yml'

Patrick Jentsch
committed
)
with open(defaults_file, 'r') as f:
defaults = yaml.safe_load(f)
for m in defaults:
model = TesseractOCRPipelineModel.query.filter_by(title=m['title'], version=m['version']).first() # noqa
if model is not None:
model.compatible_service_versions = m['compatible_service_versions']
model.description = m['description']
model.publisher = m['publisher']
model.publisher_url = m['publisher_url']
model.publishing_url = m['publishing_url']
model.publishing_year = m['publishing_year']
model.is_public = True
model.title = m['title']
model.version = m['version']

Patrick Jentsch
committed
continue
compatible_service_versions=m['compatible_service_versions'],

Patrick Jentsch
committed
description=m['description'],
publisher=m['publisher'],
publisher_url=m['publisher_url'],
publishing_url=m['publishing_url'],

Patrick Jentsch
committed
publishing_year=m['publishing_year'],
is_public=True,

Patrick Jentsch
committed
title=m['title'],

Patrick Jentsch
committed
version=m['version']
)
db.session.add(model)
db.session.flush(objects=[model])
db.session.refresh(model)
model.filename = f'{model.id}.traineddata'

Patrick Jentsch
committed
r = requests.get(m['url'], stream=True)
pbar = tqdm(
desc=f'{model.title} ({model.filename})',

Patrick Jentsch
committed
unit="B",
unit_scale=True,
unit_divisor=1024,
total=int(r.headers['Content-Length'])
)
pbar.clear()
with open(model.path, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
pbar.update(len(chunk))
f.write(chunk)
pbar.close()
db.session.commit()
def delete(self):
try:
os.remove(self.path)
except OSError as e:
current_app.logger.error(e)
db.session.delete(self)
def to_json_serializeable(self, backrefs=False, relationships=False):
json_serializeable = {
'id': self.hashid,
'compatible_service_versions': self.compatible_service_versions,
'description': self.description,
'publisher': self.publisher,
'publisher_url': self.publisher_url,
'publishing_url': self.publishing_url,
'publishing_year': self.publishing_year,
'is_public': self.is_public,
**self.file_mixin_to_json_serializeable()
json_serializeable['user'] = \
self.user.to_json_serializeable(backrefs=True)
return json_serializeable
class SpaCyNLPPipelineModel(FileMixin, HashidMixin, db.Model):
__tablename__ = 'spacy_nlp_pipeline_models'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
# Fields
title = db.Column(db.String(64))
description = db.Column(db.String(255))
version = db.Column(db.String(16))
compatible_service_versions = db.Column(ContainerColumn(list, 255))
publisher = db.Column(db.String(128))
publisher_url = db.Column(db.String(512))
publishing_url = db.Column(db.String(512))
publishing_year = db.Column(db.Integer)
pipeline_name = db.Column(db.String(64))
is_public = db.Column(db.Boolean, default=False)
# Backrefs: user: User
@property
def path(self):
return os.path.join(
self.user.path,
'spacy_nlp_pipeline_models',
str(self.id)
)
@property
def jsonpatch_path(self):
return f'{self.user.jsonpatch_path}/spacy_nlp_pipeline_models/{self.hashid}'
@property
def url(self):
return url_for(
'contributions.spacy_nlp_pipeline_model',
spacy_nlp_pipeline_model_id=self.id
)
@property
def user_hashid(self):
return self.user.hashid
@staticmethod
def insert_defaults():
nopaque_user = User.query.filter_by(username='nopaque').first()
defaults_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'SpaCyNLPPipelineModel.defaults.yml'
)
with open(defaults_file, 'r') as f:
defaults = yaml.safe_load(f)
for m in defaults:
model = SpaCyNLPPipelineModel.query.filter_by(title=m['title'], version=m['version']).first() # noqa
if model is not None:
model.compatible_service_versions = m['compatible_service_versions']
model.description = m['description']
model.publisher = m['publisher']
model.publisher_url = m['publisher_url']
model.publishing_url = m['publishing_url']
model.publishing_year = m['publishing_year']
model.is_public = True
model.title = m['title']
model.version = m['version']
model.pipeline_name = m['pipeline_name']
continue
model = SpaCyNLPPipelineModel(
compatible_service_versions=m['compatible_service_versions'],
description=m['description'],
publisher=m['publisher'],
publisher_url=m['publisher_url'],
publishing_url=m['publishing_url'],
publishing_year=m['publishing_year'],
is_public=True,
title=m['title'],
user=nopaque_user,
version=m['version'],
pipeline_name=m['pipeline_name']
)
db.session.add(model)
db.session.flush(objects=[model])
db.session.refresh(model)
r = requests.get(m['url'], stream=True)
pbar = tqdm(
desc=f'{model.title} ({model.filename})',
unit="B",
unit_scale=True,
unit_divisor=1024,
total=int(r.headers['Content-Length'])
)
pbar.clear()
with open(model.path, 'wb') as f:

Patrick Jentsch
committed
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
pbar.update(len(chunk))
f.write(chunk)
pbar.close()
db.session.commit()
def delete(self):
try:
os.remove(self.path)
except OSError as e:
current_app.logger.error(e)
db.session.delete(self)

Patrick Jentsch
committed
def to_json_serializeable(self, backrefs=False, relationships=False):
json_serializeable = {
'id': self.hashid,
'compatible_service_versions': self.compatible_service_versions,
'description': self.description,
'publisher': self.publisher,
'publisher_url': self.publisher_url,
'publishing_url': self.publishing_url,
'publishing_year': self.publishing_year,
'is_public': self.is_public,
**self.file_mixin_to_json_serializeable()
json_serializeable['user'] = self.user.to_json_serializeable(backrefs=True)
return json_serializeable

Patrick Jentsch
committed
class JobInput(FileMixin, HashidMixin, db.Model):
__tablename__ = 'job_inputs'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
job_id = db.Column(db.Integer, db.ForeignKey('jobs.id'))
# Backrefs: job: Job
def __repr__(self):
return f'<JobInput {self.filename}>'
return url_for(
'jobs.download_job_input',
job_id=self.job.id,
job_input_id=self.id
)
@property
def jsonpatch_path(self):
return f'{self.job.jsonpatch_path}/inputs/{self.hashid}'

Patrick Jentsch
committed
@property
def path(self):

Patrick Jentsch
committed
return os.path.join(self.job.path, 'inputs', str(self.id))

Patrick Jentsch
committed
return url_for(
'jobs.job',
job_id=self.job_id,
_anchor=f'job-{self.job.hashid}-input-{self.hashid}'
)
@property
def user_hashid(self):
return self.job.user.hashid
@property
def user_id(self):
def to_json_serializeable(self, backrefs=False, relationships=False):
json_serializeable = {
**self.file_mixin_to_json_serializeable()
json_serializeable['job'] = \
self.job.to_json_serializeable(backrefs=True)
return json_serializeable
class JobResult(FileMixin, HashidMixin, db.Model):
__tablename__ = 'job_results'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
job_id = db.Column(db.Integer, db.ForeignKey('jobs.id'))

Patrick Jentsch
committed
# Fields
description = db.Column(db.String(255))
# Backrefs: job: Job
def __repr__(self):
return f'<JobResult {self.filename}>'
return url_for(
'jobs.download_job_result',
job_id=self.job_id,
job_result_id=self.id
)
@property
def jsonpatch_path(self):
return f'{self.job.jsonpatch_path}/results/{self.hashid}'

Patrick Jentsch
committed
@property
def path(self):

Patrick Jentsch
committed
return os.path.join(self.job.path, 'results', str(self.id))

Patrick Jentsch
committed
return url_for(
'jobs.job',
job_id=self.job_id,
_anchor=f'job-{self.job.hashid}-result-{self.hashid}'
)
@property
def user_hashid(self):
return self.job.user.hashid