Newer
Older
from app.converters.vrt import normalize_vrt_file
from app.sqlalchemy_type_decorators import ContainerColumn, IntEnumColumn
from datetime import datetime, timedelta

Patrick Jentsch
committed
from enum import IntEnum
from flask_hashids import HashidMixin
from flask_login import UserMixin
from itsdangerous import BadSignature, TimedJSONWebSignatureSerializer
from time import sleep

Patrick Jentsch
committed
from tqdm import tqdm
from werkzeug.security import generate_password_hash, check_password_hash
import base64

Patrick Jentsch
committed
import json

Patrick Jentsch
committed
import requests
import shutil

Patrick Jentsch
committed
import xml.etree.ElementTree as ET
import yaml
TRANSKRIBUS_HTR_MODELS = \
json.loads(requests.get('https://transkribus.eu/TrpServer/rest/models/text').content)['trpModelMetadata'] # noqa
'''
Mixin for db.Model classes. All file related models should use this.
'''
creation_date = db.Column(db.DateTime, default=datetime.utcnow)

Patrick Jentsch
committed
filename = db.Column(db.String(255))
last_edited_date = db.Column(db.DateTime, default=datetime.utcnow)
mimetype = db.Column(db.String(255))
def file_mixin_to_dict(self, backrefs=False, relationships=False):
return {
'creation_date': self.creation_date.isoformat() + 'Z',
'filename': self.filename,
'last_edited_date': self.last_edited_date.isoformat() + 'Z',
'mimetype': self.mimetype
}
class Permission(IntEnum):
'''
Defines User permissions as integers by the power of 2. User permission
can be evaluated using the bitwise operator &.
'''
ADMINISTRATE = 1
CONTRIBUTE = 2
USE_API = 4
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
default = db.Column(db.Boolean, default=False, index=True)
users = db.relationship('User', backref='role', lazy='dynamic')
def __init__(self, **kwargs):
if self.permissions is None:
self.permissions = 0
def __repr__(self):
def add_permission(self, permission):
if not self.has_permission(permission):
self.permissions += permission
def has_permission(self, permission):
return self.permissions & permission == permission
def remove_permission(self, permission):
if self.has_permission(permission):
self.permissions -= permission
def reset_permissions(self):
self.permissions = 0
def to_dict(self, backrefs=False, relationships=False):
dict_role = {
'id': self.hashid,
'default': self.default,
'name': self.name,
'permissions': self.permissions
}
if relationships:
dict_role['users'] = {
x.hashid: x.to_dict(backrefs=False, relationships=True)
for x in self.users
}
return dict_role
@staticmethod

Patrick Jentsch
committed
def insert_defaults():
'API user': [Permission.USE_API],
'Contributor': [Permission.CONTRIBUTE],
'Administrator': [
Permission.ADMINISTRATE,
Permission.CONTRIBUTE,
Permission.USE_API
]
}
default_role_name = 'User'
for role_name, permissions in roles.items():
role = Role.query.filter_by(name=role_name).first()
if role is None:
role.reset_permissions()
for permission in permissions:
role.add_permission(permission)
role.default = role.name == default_role_name
db.session.add(role)
db.session.commit()
class UserSettingJobStatusMailNotificationLevel(IntEnum):
NONE = 1
END = 2
ALL = 3
class User(HashidMixin, UserMixin, db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
email = db.Column(db.String(254), unique=True, index=True)
last_seen = db.Column(db.DateTime(), default=datetime.utcnow)
member_since = db.Column(db.DateTime(), default=datetime.utcnow)
password_hash = db.Column(db.String(128))
token = db.Column(db.String(32), index=True, unique=True)
token_expiration = db.Column(db.DateTime)
username = db.Column(db.String(64), unique=True, index=True)
setting_dark_mode = db.Column(db.Boolean, default=False)
setting_job_status_mail_notification_level = db.Column(
IntEnumColumn(UserSettingJobStatusMailNotificationLevel),
default=UserSettingJobStatusMailNotificationLevel.END

Patrick Jentsch
committed
)

Patrick Jentsch
committed
tesseract_ocr_models = db.relationship(
'TesseractOCRModel',
backref='user',
cascade='all, delete-orphan',
lazy='dynamic'
)
transkribus_htr_models = db.relationship(
'TranskribusHTRModel',
backref='user',
cascade='all, delete-orphan',
lazy='dynamic'
)
corpora = db.relationship(
'Corpus',
backref='user',
cascade='all, delete-orphan',
lazy='dynamic'
)
jobs = db.relationship(
'Job',
backref='user',
cascade='all, delete-orphan',
lazy='dynamic'
)
def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.role is not None:
return
if self.email == current_app.config['NOPAQUE_ADMIN']:
self.role = Role.query.filter_by(name='Administrator').first()
else:
self.role = Role.query.filter_by(default=True).first()
def __repr__(self):
return f'<User {self.username}>'

Patrick Jentsch
committed
@property
def jsonpatch_path(self):
return f'/users/{self.hashid}'

Patrick Jentsch
committed
@property
def password(self):
raise AttributeError('password is not a readable attribute')
@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)
@property
def path(self):
return os.path.join(

Patrick Jentsch
committed
current_app.config.get('NOPAQUE_DATA_DIR'), 'users', str(self.id))
def can(self, permission):
return self.role.has_permission(permission)
s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'])
return False
self.confirmed = True
db.session.add(self)
return True

Patrick Jentsch
committed
shutil.rmtree(self.path, ignore_errors=True)
def generate_confirmation_token(self, expiration=3600):
s = TimedJSONWebSignatureSerializer(
current_app.config['SECRET_KEY'], expiration)
return s.dumps({'confirm': self.hashid}).decode('utf-8')
def generate_reset_token(self, expiration=3600):
s = TimedJSONWebSignatureSerializer(
current_app.config['SECRET_KEY'], expiration)
return s.dumps({'reset': self.hashid}).decode('utf-8')
def get_token(self, expires_in=3600):
now = datetime.utcnow()
if self.token and self.token_expiration > now + timedelta(seconds=60):
return self.token
self.token = base64.b64encode(os.urandom(24)).decode('utf-8')
self.token_expiration = now + timedelta(seconds=expires_in)
db.session.add(self)
return self.token
def is_administrator(self):
return self.can(Permission.ADMINISTRATE)

Patrick Jentsch
committed
def makedirs(self):
os.mkdir(self.path)
os.mkdir(os.path.join(self.path, 'tesseract_ocr_models'))
os.mkdir(os.path.join(self.path, 'corpora'))
os.mkdir(os.path.join(self.path, 'jobs'))
def revoke_token(self):
self.token_expiration = datetime.utcnow() - timedelta(seconds=1)
def to_dict(self, backrefs=False, relationships=False):
dict_user = {
'id': self.hashid,
'role_id': self.role.hashid,
'confirmed': self.confirmed,
'email': self.email,
'last_seen': self.last_seen.isoformat() + 'Z',
'member_since': self.member_since.isoformat() + 'Z',
'username': self.username,
'settings': {
'dark_mode': self.setting_dark_mode,

Patrick Jentsch
committed
'job_status_mail_notification_level':
self.setting_job_status_mail_notification_level.name
}
}
if backrefs:
dict_user['role'] = self.role.to_dict(
backrefs=True, relationships=False)
if relationships:
dict_user['corpora'] = {
x.hashid: x.to_dict(backrefs=False, relationships=True)
for x in self.corpora
}
dict_user['jobs'] = {
x.hashid: x.to_dict(backrefs=False, relationships=True)
for x in self.jobs
}

Patrick Jentsch
committed
dict_user['tesseract_ocr_models'] = {
x.hashid: x.to_dict(backrefs=False, relationships=True)

Patrick Jentsch
committed
for x in self.tesseract_ocr_models
}
return dict_user
def verify_password(self, password):
return check_password_hash(self.password_hash, password)
@staticmethod
def check_token(token):
user = User.query.filter_by(token=token).first()
if user is None or user.token_expiration < datetime.utcnow():
return None
return user

Patrick Jentsch
committed
@staticmethod
def insert_defaults():
if User.query.filter_by(username='nopaque').first() is not None:
return
user = User(username='nopaque')
db.session.add(user)
db.session.flush(objects=[user])
db.session.refresh(user)
try:
user.makedirs()
except OSError as e:
current_app.logger.error(e)
db.session.rollback()
db.session.commit()
@staticmethod
def reset_password(token, new_password):
s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token.encode('utf-8'))
except BadSignature:
return False
user = User.query.get(data.get('reset'))
if user is None:
return False
user.password = new_password
db.session.add(user)
return True

Patrick Jentsch
committed
class TesseractOCRModel(FileMixin, HashidMixin, db.Model):
__tablename__ = 'tesseract_ocr_models'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
# Fields
compatible_service_versions = db.Column(ContainerColumn(list, 255))

Patrick Jentsch
committed
description = db.Column(db.String(255))
publisher = db.Column(db.String(128))
publisher_url = db.Column(db.String(512))
publishing_url = db.Column(db.String(512))

Patrick Jentsch
committed
publishing_year = db.Column(db.Integer)
shared = db.Column(db.Boolean, default=False)

Patrick Jentsch
committed
title = db.Column(db.String(64))
version = db.Column(db.String(16))
# Backrefs: user: User
@property
def path(self):
return os.path.join(
self.user.path,
'tesseract_ocr_models',
str(self.id)
)

Patrick Jentsch
committed
def to_dict(self, backrefs=False, relationships=False):
dict_tesseract_ocr_model = {
'id': self.hashid,
'user_id': self.user.hashid,
'compatible_service_versions': self.compatible_service_versions,

Patrick Jentsch
committed
'description': self.description,
'publisher': self.publisher,
'publisher_url': self.publisher_url,
'publishing_url': self.publishing_url,

Patrick Jentsch
committed
'publishing_year': self.publishing_year,

Patrick Jentsch
committed
'title': self.title,
**self.file_mixin_to_dict()
}
if backrefs:
dict_tesseract_ocr_model['user'] = self.user.to_dict(
backrefs=True, relationships=False)
if relationships:
pass
return dict_tesseract_ocr_model

Patrick Jentsch
committed
@staticmethod
def insert_defaults():
user = User.query.filter_by(username='nopaque').first()
defaults_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'TesseractOCRModel.defaults.yml'
)
with open(defaults_file, 'r') as f:
defaults = yaml.safe_load(f)
for m in defaults:
model = TesseractOCRModel.query.filter_by(title=m['title'], version=m['version']).first() # noqa
if model is not None:
model.compatible_service_versions = m['compatible_service_versions']
model.description = m['description']
model.publisher = m['publisher']
model.publisher_url = m['publisher_url']
model.publishing_url = m['publishing_url']
model.publishing_year = m['publishing_year']
model.title = m['title']
model.version = m['version']

Patrick Jentsch
committed
continue
model = TesseractOCRModel(
compatible_service_versions=m['compatible_service_versions'],

Patrick Jentsch
committed
description=m['description'],
publisher=m['publisher'],
publisher_url=m['publisher_url'],
publishing_url=m['publishing_url'],

Patrick Jentsch
committed
publishing_year=m['publishing_year'],
shared=True,

Patrick Jentsch
committed
title=m['title'],
user=user,
version=m['version']
)
db.session.add(model)
db.session.flush(objects=[model])
db.session.refresh(model)
model.filename = f'{model.id}.traineddata'

Patrick Jentsch
committed
r = requests.get(m['url'], stream=True)
pbar = tqdm(
desc=f'{model.title} ({model.filename})',

Patrick Jentsch
committed
unit="B",
unit_scale=True,
unit_divisor=1024,
total=int(r.headers['Content-Length'])
)
pbar.clear()
with open(model.path, 'wb') as f:

Patrick Jentsch
committed
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
pbar.update(len(chunk))
f.write(chunk)
pbar.close()
db.session.commit()
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
class TranskribusHTRModel(HashidMixin, db.Model):
__tablename__ = 'transkribus_htr_models'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
# Fields
shared = db.Column(db.Boolean, default=False)
transkribus_model_id = db.Column(db.Integer)
transkribus_name = db.Column(db.String(64))
# Backrefs: user: User
def to_dict(self, backrefs=False, relationships=False):
dict_tesseract_ocr_model = {
'id': self.hashid,
'user_id': self.user.hashid,
'shared': self.shared,
'transkribus_model_id': self.transkribus_model_id,
'transkribus_name': self.transkribus_name
}
if backrefs:
dict_tesseract_ocr_model['user'] = \
self.user.to_dict(backrefs=True, relationships=False)
if relationships:
pass
return dict_tesseract_ocr_model
@staticmethod
def insert_defaults():
user = User.query.filter_by(username='nopaque').first()
# models = [
# m for m in TRANSKRIBUS_HTR_MODELS if True
# and 'creator' in m and m['creator'] == 'Transkribus Team'
# and 'docType' in m and m['docType'] == 'handwritten'
# ]
m for m in TRANSKRIBUS_HTR_MODELS
if m['modelId'] in [35909, 33744, 33597, 29820, 37789, 13685, 37855, 26124, 37738, 30919, 34763]
]
for m in models:
model = TranskribusHTRModel.query.filter_by(transkribus_model_id=m['modelId']).first() # noqa
if model is not None:
model.shared = True
model.transkribus_model_id = m['modelId']
model.transkribus_name = m['name']
continue
model = TranskribusHTRModel(
shared=True,
transkribus_name=m['name'],
transkribus_model_id=m['modelId'],
user=user,
)
db.session.add(model)
db.session.commit()
class JobInput(FileMixin, HashidMixin, db.Model):
__tablename__ = 'job_inputs'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
job_id = db.Column(db.Integer, db.ForeignKey('jobs.id'))
# Backrefs: job: Job
def __repr__(self):
return f'<JobInput {self.filename}>'
return url_for(
'jobs.download_job_input',
job_id=self.job.id,
job_input_id=self.id
)
@property
def jsonpatch_path(self):
return f'{self.job.jsonpatch_path}/inputs/{self.hashid}'

Patrick Jentsch
committed
@property
def path(self):

Patrick Jentsch
committed
return os.path.join(self.job.path, 'inputs', str(self.id))

Patrick Jentsch
committed
def to_dict(self, backrefs=False, relationships=False):
dict_job_input = {
'id': self.hashid,
'job_id': self.job.hashid,
'download_url': self.download_url,
'url': self.url,
**self.file_mixin_to_dict()
}
if backrefs:
dict_job_input['job'] = self.job.to_dict(
backrefs=True, relationships=False)
return dict_job_input
return url_for(
'jobs.job',
job_id=self.job_id,
_anchor=f'job-{self.job.hashid}-input-{self.hashid}'
)
@property
def user_hashid(self):
return self.job.user.hashid
@property
def user_id(self):
return self.job.user_id
class JobResult(FileMixin, HashidMixin, db.Model):
__tablename__ = 'job_results'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
job_id = db.Column(db.Integer, db.ForeignKey('jobs.id'))

Patrick Jentsch
committed
# Fields
description = db.Column(db.String(255))
# Backrefs: job: Job
def __repr__(self):
return f'<JobResult {self.filename}>'
return url_for(
'jobs.download_job_result',
job_id=self.job_id,
job_result_id=self.id
)
@property
def jsonpatch_path(self):
return f'{self.job.jsonpatch_path}/results/{self.hashid}'

Patrick Jentsch
committed
@property
def path(self):

Patrick Jentsch
committed
return os.path.join(self.job.path, 'results', str(self.id))

Patrick Jentsch
committed
def to_dict(self, backrefs=False, relationships=False):
dict_job_result = {
'id': self.hashid,
'job_id': self.job.hashid,

Patrick Jentsch
committed
'description': self.description,
'download_url': self.download_url,
'url': self.url,
**self.file_mixin_to_dict(
backrefs=backrefs, relationships=relationships)
}
if backrefs:
dict_job_result['job'] = self.job.to_dict(
backrefs=True, relationships=False)
return dict_job_result
return url_for(
'jobs.job',
job_id=self.job_id,
_anchor=f'job-{self.job.hashid}-result-{self.hashid}'
)
@property
def user_hashid(self):
return self.job.user.hashid
@property
def user_id(self):
return self.job.user_id
class JobStatus(IntEnum):
INITIALIZING = 1
SUBMITTED = 2
QUEUED = 3
RUNNING = 4
CANCELING = 5
CANCELED = 6
COMPLETED = 7
FAILED = 8
# Foreign keys
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
creation_date = db.Column(db.DateTime(), default=datetime.utcnow)
end_date = db.Column(db.DateTime())
service_args = db.Column(ContainerColumn(dict, 255))
default=JobStatus.INITIALIZING
)
inputs = db.relationship(
'JobInput',
backref='job',
cascade='all, delete-orphan',
lazy='dynamic'
)
results = db.relationship(
'JobResult',
backref='job',
cascade='all, delete-orphan',
lazy='dynamic'
)
def __repr__(self):
return f'<Job {self.title}>'

Patrick Jentsch
committed
@property
def jsonpatch_path(self):
return f'{self.user.jsonpatch_path}/jobs/{self.hashid}'

Patrick Jentsch
committed
@property
def path(self):
return os.path.join(self.user.path, 'jobs', str(self.id))
def url(self):
return url_for('jobs.job', job_id=self.id)
@property
def user_hashid(self):
return self.user.hashid
Delete the job and its inputs and results from the database.

Patrick Jentsch
committed
if self.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: # noqa
self.status = JobStatus.CANCELING
db.session.commit()

Patrick Jentsch
committed
while self.status != JobStatus.CANCELED:
# In case the daemon handled a job in any way

Patrick Jentsch
committed
if self.status != JobStatus.CANCELING:
self.status = JobStatus.CANCELING
db.session.commit()
sleep(1)
db.session.refresh(self)

Patrick Jentsch
committed
shutil.rmtree(self.path, ignore_errors=True)

Patrick Jentsch
committed
def makedirs(self):
os.mkdir(self.path)
os.mkdir(os.path.join(self.path, 'inputs'))
os.mkdir(os.path.join(self.path, 'pipeline_data'))
os.mkdir(os.path.join(self.path, 'results'))
Restart a job - only if the status is complete or failed

Patrick Jentsch
committed
if self.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: # noqa
raise Exception('Could not restart job: status is not "completed/failed"') # noqa

Patrick Jentsch
committed
shutil.rmtree(os.path.join(self.path, 'results'), ignore_errors=True)

Patrick Jentsch
committed
shutil.rmtree(os.path.join(self.path, 'pyflow.data'), ignore_errors=True) # noqa
for result in self.results:
db.session.delete(result)

Patrick Jentsch
committed
self.status = JobStatus.SUBMITTED
def to_dict(self, backrefs=False, relationships=False):
dict_job = {
'id': self.hashid,
'user_id': self.user.hashid,
'creation_date': self.creation_date.isoformat() + 'Z',
'description': self.description,
'end_date': None if self.end_date is None else f'{self.end_date.isoformat()}Z', # noqa
'service': self.service,
'service_args': self.service_args,
'service_version': self.service_version,

Patrick Jentsch
committed
'status': self.status.name,
'title': self.title,
if backrefs:
dict_job['user'] = self.user.to_dict(
backrefs=True, relationships=False)
if relationships:
dict_job['inputs'] = {
x.hashid: x.to_dict(backrefs=False, relationships=True)
for x in self.inputs
}
dict_job['results'] = {
x.hashid: x.to_dict(backrefs=False, relationships=True)
for x in self.results
}
return dict_job

Patrick Jentsch
committed
class CorpusFile(FileMixin, HashidMixin, db.Model):
__tablename__ = 'corpus_files'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
corpus_id = db.Column(db.Integer, db.ForeignKey('corpora.id'))
address = db.Column(db.String(255))
author = db.Column(db.String(255))
booktitle = db.Column(db.String(255))
chapter = db.Column(db.String(255))
editor = db.Column(db.String(255))
institution = db.Column(db.String(255))
journal = db.Column(db.String(255))
pages = db.Column(db.String(255))
publisher = db.Column(db.String(255))
publishing_year = db.Column(db.Integer)
school = db.Column(db.String(255))
title = db.Column(db.String(255))
return url_for(
'corpora.download_corpus_file',
corpus_id=self.corpus_id,
corpus_file_id=self.id
)
@property
def jsonpatch_path(self):

Patrick Jentsch
committed
@property
def path(self):

Patrick Jentsch
committed
return os.path.join(self.corpus.path, 'files', str(self.id))

Patrick Jentsch
committed
return url_for(
'corpora.corpus_file',
corpus_id=self.corpus_id,
corpus_file_id=self.id
)
@property
def user_hashid(self):
return self.corpus.user.hashid
@property
def user_id(self):
return self.corpus.user_id

Patrick Jentsch
committed
os.remove(self.path)
f'Removing {self.path} led to an OSError!'

Patrick Jentsch
committed
self.corpus.status = CorpusStatus.UNPREPARED
def to_dict(self, backrefs=False, relationships=False):
dict_corpus_file = {
'id': self.hashid,
'corpus_id': self.corpus.hashid,
'download_url': self.download_url,
'url': self.url,
'address': self.address,
'author': self.author,
'booktitle': self.booktitle,
'chapter': self.chapter,
'editor': self.editor,
'institution': self.institution,
'journal': self.journal,
'pages': self.pages,
'publisher': self.publisher,
'publishing_year': self.publishing_year,
'school': self.school,
'title': self.title,
**self.file_mixin_to_dict(
backrefs=backrefs, relationships=relationships)
}
if backrefs:
dict_corpus_file['corpus'] = self.corpus.to_dict(
backrefs=True, relationships=False)
class CorpusStatus(IntEnum):
UNPREPARED = 1
SUBMITTED = 2
QUEUED = 3
BUILDING = 4
BUILT = 5
FAILED = 6
STARTING_ANALYSIS_SESSION = 7
RUNNING_ANALYSIS_SESSION = 8
CANCELING_ANALYSIS_SESSION = 9
__tablename__ = 'corpora'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
creation_date = db.Column(db.DateTime(), default=datetime.utcnow)
description = db.Column(db.String(255))

Patrick Jentsch
committed
last_edited_date = db.Column(db.DateTime(), default=datetime.utcnow)
default=CorpusStatus.UNPREPARED
)
num_analysis_sessions = db.Column(db.Integer, default=0)
num_tokens = db.Column(db.Integer, default=0)
files = db.relationship(
'CorpusFile',
backref='corpus',
lazy='dynamic',
cascade='all, delete-orphan'
)

Patrick Jentsch
committed
# "static" attributes
def __repr__(self):
return f'<Corpus {self.title}>'
@property
def analysis_url(self):
return url_for('corpora.analyse_corpus', corpus_id=self.id)
@property
def jsonpatch_path(self):
return f'{self.user.jsonpatch_path}/corpora/{self.hashid}'

Patrick Jentsch
committed
@property
def path(self):
return os.path.join(self.user.path, 'corpora', str(self.id))

Patrick Jentsch
committed
@property
def url(self):
return url_for('corpora.corpus', corpus_id=self.id)
@property
def user_hashid(self):
return self.user.hashid
corpus_element = ET.fromstring('<corpus>\n</corpus>')
normalized_vrt_path = os.path.join(self.path, 'cwb', f'{corpus_file.id}.norm.vrt')
try:
normalize_vrt_file(corpus_file.path, normalized_vrt_path)
except:
self.status = CorpusStatus.FAILED
return
element_tree = ET.parse(normalized_vrt_path)

Patrick Jentsch
committed
text_element = element_tree.getroot()
text_element.set('address', corpus_file.address or 'NULL')
text_element.set('author', corpus_file.author)
text_element.set('booktitle', corpus_file.booktitle or 'NULL')
text_element.set('chapter', corpus_file.chapter or 'NULL')
text_element.set('editor', corpus_file.editor or 'NULL')
text_element.set('institution', corpus_file.institution or 'NULL')
text_element.set('journal', corpus_file.journal or 'NULL')
text_element.set('pages', corpus_file.pages or 'NULL')
text_element.set('publisher', corpus_file.publisher or 'NULL')
text_element.set('publishing_year', str(corpus_file.publishing_year)) # noqa
text_element.set('school', corpus_file.school or 'NULL')
text_element.set('title', corpus_file.title)
text_element.tail = '\n'
# corpus_element.insert(1, text_element)
corpus_element.append(text_element)

Patrick Jentsch
committed
ET.ElementTree(corpus_element).write(
os.path.join(self.path, 'cwb', 'corpus.vrt'),
encoding='utf-8'
)
self.last_edited_date = datetime.utcnow()

Patrick Jentsch
committed
self.status = CorpusStatus.SUBMITTED

Patrick Jentsch
committed
shutil.rmtree(self.path, ignore_errors=True)

Stephan Porada
committed
db.session.delete(self)

Patrick Jentsch
committed
def makedirs(self):
os.mkdir(self.path)
os.mkdir(os.path.join(self.path, 'files'))
os.mkdir(os.path.join(self.path, 'cwb'))
os.mkdir(os.path.join(self.path, 'cwb', 'data'))
os.mkdir(os.path.join(self.path, 'cwb', 'registry'))
def to_dict(self, backrefs=False, relationships=False):
dict_corpus = {
'id': self.hashid,
'user_id': self.user.hashid,
'analysis_url': self.analysis_url,
'url': self.url,
'creation_date': self.creation_date.isoformat() + 'Z',
'description': self.description,
'max_num_tokens': self.max_num_tokens,
'num_analysis_sessions': self.num_analysis_sessions,
'num_tokens': self.num_tokens,

Patrick Jentsch
committed
'status': self.status.name,
'last_edited_date': self.last_edited_date.isoformat() + 'Z',
'title': self.title
}
if backrefs:
dict_corpus['user'] = self.user.to_dict(
backrefs=True,
relationships=False
)
if relationships:
dict_corpus['files'] = {
for x in self.files
}
return dict_corpus
@login.user_loader
def load_user(user_id):
return User.query.get(int(user_id))