Newer
Older
from app.converters.vrt import normalize_vrt_file
from app.email import create_message
from datetime import datetime, timedelta
from enum import Enum, IntEnum
from flask_hashids import HashidMixin
from flask_login import UserMixin
from time import sleep

Patrick Jentsch
committed
from tqdm import tqdm
from werkzeug.security import generate_password_hash, check_password_hash
import base64

Patrick Jentsch
committed
import json

Patrick Jentsch
committed
import requests
import shutil

Patrick Jentsch
committed
import xml.etree.ElementTree as ET
import yaml
json.loads(requests.get('https://transkribus.eu/TrpServer/rest/models/text', params={'docType': 'handwritten'}).content)['trpModelMetadata'] # noqa
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
##############################################################################
# enums #
##############################################################################
# region enums
class CorpusStatus(IntEnum):
UNPREPARED = 1
SUBMITTED = 2
QUEUED = 3
BUILDING = 4
BUILT = 5
FAILED = 6
STARTING_ANALYSIS_SESSION = 7
RUNNING_ANALYSIS_SESSION = 8
CANCELING_ANALYSIS_SESSION = 9
class JobStatus(IntEnum):
INITIALIZING = 1
SUBMITTED = 2
QUEUED = 3
RUNNING = 4
CANCELING = 5
CANCELED = 6
COMPLETED = 7
FAILED = 8
class Permission(IntEnum):
'''
Defines User permissions as integers by the power of 2. User permission
can be evaluated using the bitwise operator &.
'''
ADMINISTRATE = 1
CONTRIBUTE = 2
USE_API = 4
class UserSettingJobStatusMailNotificationLevel(IntEnum):
NONE = 1
END = 2
ALL = 3
# endregion enums
##############################################################################
# mixins #
##############################################################################
# region mixins
'''
Mixin for db.Model classes. All file related models should use this.
'''
creation_date = db.Column(db.DateTime, default=datetime.utcnow)

Patrick Jentsch
committed
filename = db.Column(db.String(255))
last_edited_date = db.Column(db.DateTime, default=datetime.utcnow)
mimetype = db.Column(db.String(255))
def file_mixin_to_dict(self, backrefs=False, relationships=False):
return {
'creation_date': self.creation_date.isoformat() + 'Z',
'filename': self.filename,
'last_edited_date': self.last_edited_date.isoformat() + 'Z',
'mimetype': self.mimetype
}
# endregion mixins
##############################################################################
# type_decorators #
##############################################################################
# region type_decorators
class IntEnumColumn(db.TypeDecorator):
impl = db.Integer
def __init__(self, enum_type, *args, **kwargs):
super().__init__(*args, **kwargs)
self.enum_type = enum_type
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def process_bind_param(self, value, dialect):
if isinstance(value, self.enum_type) and isinstance(value.value, int):
return value.value
elif isinstance(value, int):
return self.enum_type(value).value
else:
return TypeError()
def process_result_value(self, value, dialect):
return self.enum_type(value)
class ContainerColumn(db.TypeDecorator):
impl = db.String
def __init__(self, container_type, *args, **kwargs):
super().__init__(*args, **kwargs)
self.container_type = container_type
def process_bind_param(self, value, dialect):
if isinstance(value, self.container_type):
return json.dumps(value)
elif (
isinstance(value, str)
and isinstance(json.loads(value), self.container_type)
):
return value
else:
return TypeError()
def process_result_value(self, value, dialect):
return json.loads(value)
# endregion type_decorators
##############################################################################
# Models #
##############################################################################
# region models
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
default = db.Column(db.Boolean, default=False, index=True)
users = db.relationship('User', backref='role', lazy='dynamic')
def __init__(self, **kwargs):
if self.permissions is None:
self.permissions = 0
def __repr__(self):
def add_permission(self, permission):
if not self.has_permission(permission):
self.permissions += permission
def has_permission(self, permission):
return self.permissions & permission == permission
def remove_permission(self, permission):
if self.has_permission(permission):
self.permissions -= permission
def reset_permissions(self):
self.permissions = 0
def to_dict(self, backrefs=False, relationships=False):
dict_role = {
'id': self.hashid,
'default': self.default,
'name': self.name,
'permissions': self.permissions
}
if relationships:
dict_role['users'] = {
x.hashid: x.to_dict(backrefs=False, relationships=True)
for x in self.users
}
return dict_role
@staticmethod

Patrick Jentsch
committed
def insert_defaults():
'API user': [Permission.USE_API],
'Contributor': [Permission.CONTRIBUTE],
'Administrator': [
Permission.ADMINISTRATE,
Permission.CONTRIBUTE,
Permission.USE_API
]
}
default_role_name = 'User'
for role_name, permissions in roles.items():
role = Role.query.filter_by(name=role_name).first()
if role is None:
role.reset_permissions()
for permission in permissions:
role.add_permission(permission)
role.default = role.name == default_role_name
db.session.add(role)
db.session.commit()
class Token(db.Model):
__tablename__ = 'tokens'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
# Fields
access_token = db.Column(db.String(64), nullable=False, index=True)
access_expiration = db.Column(db.DateTime, nullable=False)
refresh_token = db.Column(db.String(64), nullable=False, index=True)
refresh_expiration = db.Column(db.DateTime, nullable=False)
# def generate(self):
# header = {'alg': 'HS256', 'exp': int(time.time()) + expiration}
# payload = {'confirm': self.hashid}
# return jwt.encode(header, payload, current_app.config['SECRET_KEY'])
# self.access_token = secrets.token_urlsafe()
# self.access_expiration = datetime.utcnow() + \
# timedelta(minutes=current_app.config['ACCESS_TOKEN_MINUTES'])
# self.refresh_token = secrets.token_urlsafe()
# self.refresh_expiration = datetime.utcnow() + \
# timedelta(days=current_app.config['REFRESH_TOKEN_DAYS'])
class User(HashidMixin, UserMixin, db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
email = db.Column(db.String(254), unique=True, index=True)
last_seen = db.Column(db.DateTime(), default=datetime.utcnow)
member_since = db.Column(db.DateTime(), default=datetime.utcnow)
password_hash = db.Column(db.String(128))
token = db.Column(db.String(32), index=True, unique=True)
token_expiration = db.Column(db.DateTime)
username = db.Column(db.String(64), unique=True, index=True)
setting_dark_mode = db.Column(db.Boolean, default=False)
setting_job_status_mail_notification_level = db.Column(
IntEnumColumn(UserSettingJobStatusMailNotificationLevel),
default=UserSettingJobStatusMailNotificationLevel.END

Patrick Jentsch
committed
)

Patrick Jentsch
committed
tesseract_ocr_models = db.relationship(
'TesseractOCRModel',
backref='user',
cascade='all, delete-orphan',
lazy='dynamic'
)
transkribus_htr_models = db.relationship(
'TranskribusHTRModel',
backref='user',
cascade='all, delete-orphan',
lazy='dynamic'
)
corpora = db.relationship(
'Corpus',
backref='user',
cascade='all, delete-orphan',
lazy='dynamic'
)
jobs = db.relationship(
'Job',
backref='user',
cascade='all, delete-orphan',
lazy='dynamic'
)
def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.role is not None:
return
if self.email == current_app.config['NOPAQUE_ADMIN']:
self.role = Role.query.filter_by(name='Administrator').first()
else:
self.role = Role.query.filter_by(default=True).first()
def __repr__(self):
return f'<User {self.username}>'

Patrick Jentsch
committed
@property
def jsonpatch_path(self):
return f'/users/{self.hashid}'

Patrick Jentsch
committed
@property
def password(self):
raise AttributeError('password is not a readable attribute')
@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)
@property
def path(self):
return os.path.join(

Patrick Jentsch
committed
current_app.config.get('NOPAQUE_DATA_DIR'), 'users', str(self.id))
def can(self, permission):
return self.role.has_permission(permission)
payload = jwt.decode(
token,
current_app.config['SECRET_KEY'],
algorithms=['HS256'],
issuer=current_app.config['SERVER_NAME'],
options={'require': ['exp', 'iat', 'iss', 'purpose', 'sub']}
)
except jwt.PyJWTError:
if payload.get('purpose') != 'confirm_user':
return False
if payload.get('sub') != self.id:
return False
self.confirmed = True
db.session.add(self)
return True

Patrick Jentsch
committed
shutil.rmtree(self.path, ignore_errors=True)
def generate_confirm_user_token(self, expiration=3600):
utc_now = datetime.utcnow()
payload = {
'exp': utc_now + timedelta(seconds=expiration),
'iat': utc_now,
'iss': current_app.config['SERVER_NAME'],
'purpose': 'confirm_user',
'sub': self.id
}
return jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256')
def generate_password_reset_token(self, expiration=3600):
utc_now = datetime.utcnow()
payload = {
'exp': utc_now + timedelta(seconds=expiration),
'iat': utc_now,
'iss': current_app.config['SERVER_NAME'],
'purpose': 'reset_password',
'sub': self.id
}
return jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256')
def get_token(self, expires_in=3600):
now = datetime.utcnow()
if self.token and self.token_expiration > now + timedelta(seconds=60):
return self.token
self.token = base64.b64encode(os.urandom(24)).decode('utf-8')
self.token_expiration = now + timedelta(seconds=expires_in)
db.session.add(self)
return self.token
def is_administrator(self):
return self.can(Permission.ADMINISTRATE)

Patrick Jentsch
committed
def makedirs(self):
os.mkdir(self.path)
os.mkdir(os.path.join(self.path, 'tesseract_ocr_models'))
os.mkdir(os.path.join(self.path, 'corpora'))
os.mkdir(os.path.join(self.path, 'jobs'))
def revoke_token(self):
self.token_expiration = datetime.utcnow() - timedelta(seconds=1)
def to_dict(self, backrefs=False, relationships=False):
dict_user = {
'id': self.hashid,
'role_id': self.role.hashid,
'confirmed': self.confirmed,
'email': self.email,
'last_seen': self.last_seen.isoformat() + 'Z',
'member_since': self.member_since.isoformat() + 'Z',
'username': self.username,
'settings': {
'dark_mode': self.setting_dark_mode,

Patrick Jentsch
committed
'job_status_mail_notification_level':
self.setting_job_status_mail_notification_level.name
}
}
if backrefs:
dict_user['role'] = self.role.to_dict(
backrefs=True, relationships=False)
if relationships:
dict_user['corpora'] = {
x.hashid: x.to_dict(backrefs=False, relationships=True)
for x in self.corpora
}
dict_user['jobs'] = {
x.hashid: x.to_dict(backrefs=False, relationships=True)
for x in self.jobs
}

Patrick Jentsch
committed
dict_user['tesseract_ocr_models'] = {
x.hashid: x.to_dict(backrefs=False, relationships=True)

Patrick Jentsch
committed
for x in self.tesseract_ocr_models
}
return dict_user
def verify_password(self, password):
return check_password_hash(self.password_hash, password)
@staticmethod
def check_token(token):
user = User.query.filter_by(token=token).first()
if user is None or user.token_expiration < datetime.utcnow():
return None
return user

Patrick Jentsch
committed
@staticmethod
def insert_defaults():
if User.query.filter_by(username='nopaque').first() is not None:
return
user = User(username='nopaque')
db.session.add(user)
db.session.flush(objects=[user])
db.session.refresh(user)
try:
user.makedirs()
except OSError as e:
current_app.logger.error(e)
db.session.rollback()
db.session.commit()
@staticmethod
def reset_password(token, new_password):
try:
payload = jwt.decode(
token,
current_app.config['SECRET_KEY'],
algorithms=['HS256'],
issuer=current_app.config['SERVER_NAME'],
options={'require': ['exp', 'iat', 'iss', 'purpose', 'sub']}
)
except jwt.PyJWTError:
return False
if payload.get('purpose') != 'reset_password':
user_id = payload.get('sub')
if user_id is None:
if user is None:
return False
user.password = new_password
db.session.add(user)
return True

Patrick Jentsch
committed
class TesseractOCRModel(FileMixin, HashidMixin, db.Model):
__tablename__ = 'tesseract_ocr_models'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
# Fields
compatible_service_versions = db.Column(ContainerColumn(list, 255))

Patrick Jentsch
committed
description = db.Column(db.String(255))
publisher = db.Column(db.String(128))
publisher_url = db.Column(db.String(512))
publishing_url = db.Column(db.String(512))

Patrick Jentsch
committed
publishing_year = db.Column(db.Integer)
shared = db.Column(db.Boolean, default=False)

Patrick Jentsch
committed
title = db.Column(db.String(64))
version = db.Column(db.String(16))
# Backrefs: user: User
@property
def path(self):
return os.path.join(
self.user.path,
'tesseract_ocr_models',
str(self.id)
)

Patrick Jentsch
committed
def to_dict(self, backrefs=False, relationships=False):
dict_tesseract_ocr_model = {
'id': self.hashid,
'user_id': self.user.hashid,
'compatible_service_versions': self.compatible_service_versions,

Patrick Jentsch
committed
'description': self.description,
'publisher': self.publisher,
'publisher_url': self.publisher_url,
'publishing_url': self.publishing_url,

Patrick Jentsch
committed
'publishing_year': self.publishing_year,

Patrick Jentsch
committed
'title': self.title,
**self.file_mixin_to_dict()
}
if backrefs:
dict_tesseract_ocr_model['user'] = self.user.to_dict(
backrefs=True, relationships=False)
if relationships:
pass
return dict_tesseract_ocr_model

Patrick Jentsch
committed
@staticmethod
def insert_defaults():
user = User.query.filter_by(username='nopaque').first()
defaults_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'TesseractOCRModel.defaults.yml'
)
with open(defaults_file, 'r') as f:
defaults = yaml.safe_load(f)
for m in defaults:
model = TesseractOCRModel.query.filter_by(title=m['title'], version=m['version']).first() # noqa
if model is not None:
model.compatible_service_versions = m['compatible_service_versions']
model.description = m['description']
model.publisher = m['publisher']
model.publisher_url = m['publisher_url']
model.publishing_url = m['publishing_url']
model.publishing_year = m['publishing_year']
model.title = m['title']
model.version = m['version']

Patrick Jentsch
committed
continue
model = TesseractOCRModel(
compatible_service_versions=m['compatible_service_versions'],

Patrick Jentsch
committed
description=m['description'],
publisher=m['publisher'],
publisher_url=m['publisher_url'],
publishing_url=m['publishing_url'],

Patrick Jentsch
committed
publishing_year=m['publishing_year'],
shared=True,

Patrick Jentsch
committed
title=m['title'],
user=user,
version=m['version']
)
db.session.add(model)
db.session.flush(objects=[model])
db.session.refresh(model)
model.filename = f'{model.id}.traineddata'

Patrick Jentsch
committed
r = requests.get(m['url'], stream=True)
pbar = tqdm(
desc=f'{model.title} ({model.filename})',

Patrick Jentsch
committed
unit="B",
unit_scale=True,
unit_divisor=1024,
total=int(r.headers['Content-Length'])
)
pbar.clear()
with open(model.path, 'wb') as f:

Patrick Jentsch
committed
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
pbar.update(len(chunk))
f.write(chunk)
pbar.close()
db.session.commit()
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
class TranskribusHTRModel(HashidMixin, db.Model):
__tablename__ = 'transkribus_htr_models'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
# Fields
shared = db.Column(db.Boolean, default=False)
transkribus_model_id = db.Column(db.Integer)
transkribus_name = db.Column(db.String(64))
# Backrefs: user: User
def to_dict(self, backrefs=False, relationships=False):
dict_tesseract_ocr_model = {
'id': self.hashid,
'user_id': self.user.hashid,
'shared': self.shared,
'transkribus_model_id': self.transkribus_model_id,
}
if backrefs:
dict_tesseract_ocr_model['user'] = \
self.user.to_dict(backrefs=True, relationships=False)
if relationships:
pass
return dict_tesseract_ocr_model
@staticmethod
def insert_defaults():
user = User.query.filter_by(username='nopaque').first()
# models = [
# m for m in TRANSKRIBUS_HTR_MODELS if True
# and 'creator' in m and m['creator'] == 'Transkribus Team'
# and 'docType' in m and m['docType'] == 'handwritten'
# ]
for m in TRANSKRIBUS_HTR_MODELS:
model = TranskribusHTRModel.query.filter_by(transkribus_model_id=m['modelId']).first() # noqa
if model is not None:
model.shared = True
model.transkribus_model_id = m['modelId']
continue
model = TranskribusHTRModel(
shared=True,
transkribus_model_id=m['modelId'],
user=user,
)
db.session.add(model)
db.session.commit()
class JobInput(FileMixin, HashidMixin, db.Model):
__tablename__ = 'job_inputs'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
job_id = db.Column(db.Integer, db.ForeignKey('jobs.id'))
# Backrefs: job: Job
def __repr__(self):
return f'<JobInput {self.filename}>'
return url_for(
'jobs.download_job_input',
job_id=self.job.id,
job_input_id=self.id
)
@property
def jsonpatch_path(self):
return f'{self.job.jsonpatch_path}/inputs/{self.hashid}'

Patrick Jentsch
committed
@property
def path(self):

Patrick Jentsch
committed
return os.path.join(self.job.path, 'inputs', str(self.id))

Patrick Jentsch
committed
def to_dict(self, backrefs=False, relationships=False):
dict_job_input = {
'id': self.hashid,
'job_id': self.job.hashid,
'download_url': self.download_url,
'url': self.url,
**self.file_mixin_to_dict()
}
if backrefs:
dict_job_input['job'] = self.job.to_dict(
backrefs=True, relationships=False)
return dict_job_input
return url_for(
'jobs.job',
job_id=self.job_id,
_anchor=f'job-{self.job.hashid}-input-{self.hashid}'
)
@property
def user_hashid(self):
return self.job.user.hashid
@property
def user_id(self):
return self.job.user_id
class JobResult(FileMixin, HashidMixin, db.Model):
__tablename__ = 'job_results'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
job_id = db.Column(db.Integer, db.ForeignKey('jobs.id'))

Patrick Jentsch
committed
# Fields
description = db.Column(db.String(255))
# Backrefs: job: Job
def __repr__(self):
return f'<JobResult {self.filename}>'
return url_for(
'jobs.download_job_result',
job_id=self.job_id,
job_result_id=self.id
)
@property
def jsonpatch_path(self):
return f'{self.job.jsonpatch_path}/results/{self.hashid}'

Patrick Jentsch
committed
@property
def path(self):

Patrick Jentsch
committed
return os.path.join(self.job.path, 'results', str(self.id))

Patrick Jentsch
committed
def to_dict(self, backrefs=False, relationships=False):
dict_job_result = {
'id': self.hashid,
'job_id': self.job.hashid,

Patrick Jentsch
committed
'description': self.description,
'download_url': self.download_url,
'url': self.url,
**self.file_mixin_to_dict(
backrefs=backrefs, relationships=relationships)
}
if backrefs:
dict_job_result['job'] = self.job.to_dict(
backrefs=True, relationships=False)
return dict_job_result
return url_for(
'jobs.job',
job_id=self.job_id,
_anchor=f'job-{self.job.hashid}-result-{self.hashid}'
)
@property
def user_hashid(self):
return self.job.user.hashid
@property
def user_id(self):
return self.job.user_id
# Foreign keys
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
creation_date = db.Column(db.DateTime(), default=datetime.utcnow)
end_date = db.Column(db.DateTime())
service_args = db.Column(ContainerColumn(dict, 255))
default=JobStatus.INITIALIZING
)
inputs = db.relationship(
'JobInput',
backref='job',
cascade='all, delete-orphan',
lazy='dynamic'
)
results = db.relationship(
'JobResult',
backref='job',
cascade='all, delete-orphan',
lazy='dynamic'
)
def __repr__(self):
return f'<Job {self.title}>'

Patrick Jentsch
committed
@property
def jsonpatch_path(self):
return f'{self.user.jsonpatch_path}/jobs/{self.hashid}'

Patrick Jentsch
committed
@property
def path(self):
return os.path.join(self.user.path, 'jobs', str(self.id))
def url(self):
return url_for('jobs.job', job_id=self.id)
@property
def user_hashid(self):
return self.user.hashid
Delete the job and its inputs and results from the database.

Patrick Jentsch
committed
if self.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: # noqa
self.status = JobStatus.CANCELING
db.session.commit()

Patrick Jentsch
committed
while self.status != JobStatus.CANCELED:
# In case the daemon handled a job in any way

Patrick Jentsch
committed
if self.status != JobStatus.CANCELING:
self.status = JobStatus.CANCELING
db.session.commit()
sleep(1)
db.session.refresh(self)

Patrick Jentsch
committed
shutil.rmtree(self.path, ignore_errors=True)

Patrick Jentsch
committed
def makedirs(self):
os.mkdir(self.path)
os.mkdir(os.path.join(self.path, 'inputs'))
os.mkdir(os.path.join(self.path, 'pipeline_data'))
os.mkdir(os.path.join(self.path, 'results'))
Restart a job - only if the status is complete or failed

Patrick Jentsch
committed
if self.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: # noqa
raise Exception('Could not restart job: status is not "completed/failed"') # noqa

Patrick Jentsch
committed
shutil.rmtree(os.path.join(self.path, 'results'), ignore_errors=True)

Patrick Jentsch
committed
shutil.rmtree(os.path.join(self.path, 'pyflow.data'), ignore_errors=True) # noqa
for result in self.results:
db.session.delete(result)

Patrick Jentsch
committed
self.status = JobStatus.SUBMITTED
def to_dict(self, backrefs=False, relationships=False):
dict_job = {
'id': self.hashid,
'user_id': self.user.hashid,
'creation_date': self.creation_date.isoformat() + 'Z',
'description': self.description,
'end_date': None if self.end_date is None else f'{self.end_date.isoformat()}Z', # noqa
'service': self.service,
'service_args': self.service_args,
'service_version': self.service_version,

Patrick Jentsch
committed
'status': self.status.name,
'title': self.title,
if backrefs:
dict_job['user'] = self.user.to_dict(
backrefs=True, relationships=False)
if relationships:
dict_job['inputs'] = {
x.hashid: x.to_dict(backrefs=False, relationships=True)
for x in self.inputs
}
dict_job['results'] = {
x.hashid: x.to_dict(backrefs=False, relationships=True)
for x in self.results
}
return dict_job

Patrick Jentsch
committed
class CorpusFile(FileMixin, HashidMixin, db.Model):
__tablename__ = 'corpus_files'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
corpus_id = db.Column(db.Integer, db.ForeignKey('corpora.id'))
address = db.Column(db.String(255))
author = db.Column(db.String(255))
booktitle = db.Column(db.String(255))
chapter = db.Column(db.String(255))
editor = db.Column(db.String(255))
institution = db.Column(db.String(255))
journal = db.Column(db.String(255))
pages = db.Column(db.String(255))
publisher = db.Column(db.String(255))
publishing_year = db.Column(db.Integer)
school = db.Column(db.String(255))
title = db.Column(db.String(255))
return url_for(
'corpora.download_corpus_file',
corpus_id=self.corpus_id,
corpus_file_id=self.id
)
@property
def jsonpatch_path(self):

Patrick Jentsch
committed
@property
def path(self):

Patrick Jentsch
committed
return os.path.join(self.corpus.path, 'files', str(self.id))

Patrick Jentsch
committed
return url_for(
'corpora.corpus_file',
corpus_id=self.corpus_id,
corpus_file_id=self.id
)
@property
def user_hashid(self):
return self.corpus.user.hashid
@property
def user_id(self):
return self.corpus.user_id

Patrick Jentsch
committed
os.remove(self.path)
f'Removing {self.path} led to an OSError!'

Patrick Jentsch
committed
self.corpus.status = CorpusStatus.UNPREPARED
def to_dict(self, backrefs=False, relationships=False):
dict_corpus_file = {
'id': self.hashid,
'corpus_id': self.corpus.hashid,
'download_url': self.download_url,
'url': self.url,
'address': self.address,
'author': self.author,
'booktitle': self.booktitle,
'chapter': self.chapter,
'editor': self.editor,
'institution': self.institution,
'journal': self.journal,
'pages': self.pages,
'publisher': self.publisher,
'publishing_year': self.publishing_year,
'school': self.school,
'title': self.title,
**self.file_mixin_to_dict(
backrefs=backrefs, relationships=relationships)
}
if backrefs:
dict_corpus_file['corpus'] = self.corpus.to_dict(
backrefs=True, relationships=False)
class Corpus(HashidMixin, db.Model):
__tablename__ = 'corpora'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
creation_date = db.Column(db.DateTime(), default=datetime.utcnow)
description = db.Column(db.String(255))

Patrick Jentsch
committed
last_edited_date = db.Column(db.DateTime(), default=datetime.utcnow)
default=CorpusStatus.UNPREPARED
)
num_analysis_sessions = db.Column(db.Integer, default=0)
num_tokens = db.Column(db.Integer, default=0)
files = db.relationship(
'CorpusFile',
backref='corpus',
lazy='dynamic',
cascade='all, delete-orphan'
)

Patrick Jentsch
committed
# "static" attributes
def __repr__(self):
return f'<Corpus {self.title}>'
@property
def analysis_url(self):
return url_for('corpora.analyse_corpus', corpus_id=self.id)
@property
def jsonpatch_path(self):
return f'{self.user.jsonpatch_path}/corpora/{self.hashid}'

Patrick Jentsch
committed
@property
def path(self):
return os.path.join(self.user.path, 'corpora', str(self.id))

Patrick Jentsch
committed
@property
def url(self):
return url_for('corpora.corpus', corpus_id=self.id)
@property
def user_hashid(self):
return self.user.hashid