diff --git a/daemon/nopaqued.py b/daemon/nopaqued.py index 9ec1bf5ad0a313658cc4896286ac276453a4de2b..01eedb900b84affaa45e3e8c24441225ecbda62c 100644 --- a/daemon/nopaqued.py +++ b/daemon/nopaqued.py @@ -1,25 +1,33 @@ -from concurrent.futures import ThreadPoolExecutor +from logger.logger import init_logger from tasks.check_corpora import check_corpora from tasks.check_jobs import check_jobs from tasks.notify import notify +from time import sleep import os -# TODO: Check if thread is still alive and execute next thread after that -# TODO: Check line length - - def nopaqued(): - execute_notifications = bool(os.environ.get('NOPAQUE_EXECUTE_NOTIFICATIONS', True)) # noqa - # executing background functions + logger = init_logger() + NOPAQUE_EXECUTE_NOTIFICATIONS = os.environ.get('NOPAQUE_EXECUTE_NOTIFICATIONS', 'True').lower() == 'true' # noqa + threads = {'check_corpora': None, 'check_jobs': None, 'notify': None} + + threads['check_corpora'] = check_corpora() + threads['check_jobs'] = check_jobs() + threads['notify'] = notify(NOPAQUE_EXECUTE_NOTIFICATIONS) while True: - with ThreadPoolExecutor(max_workers=3) as executor: - executor.submit(check_jobs) - executor.submit(check_corpora) - executor.submit(notify, execute_notifications) + logger.warning('check_corpora: {}'.format(threads['check_corpora'].is_alive())) + if not threads['check_corpora'].is_alive(): + threads['check_corpora'] = check_corpora() + logger.warning('check_jobs: {}'.format(threads['check_jobs'].is_alive())) + if not threads['check_jobs'].is_alive(): + threads['check_jobs'] = check_jobs() + logger.warning('notify: {}'.format(threads['notify'].is_alive())) + if not threads['notify'].is_alive(): + threads['notify'] = notify(NOPAQUE_EXECUTE_NOTIFICATIONS) # If execute_notifications True mails are sent. # If execute_notifications False no mails are sent. # But notification status will be set nonetheless. + sleep(3) if __name__ == '__main__': diff --git a/daemon/tasks/check_corpora.py b/daemon/tasks/check_corpora.py index fb8e13fca92795e9b7e4cc9a5adb04a2dd20eba6..588d801d5bfb3f69a90b2816fa0beb75640c4e3d 100644 --- a/daemon/tasks/check_corpora.py +++ b/daemon/tasks/check_corpora.py @@ -1,11 +1,13 @@ from logger.logger import init_logger from tasks import Session, docker_client, NOPAQUE_STORAGE +from tasks.decorators import background from tasks.Models import Corpus import docker import os import shutil +@background def check_corpora(): c_session = Session() corpora = c_session.query(Corpus).all() @@ -101,7 +103,7 @@ def __create_cqpserver_container(corpus): 'volumes': [corpus_data_dir + ':/corpora/data:rw', corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], 'name': 'cqpserver_{}'.format(corpus.id), - 'network': 'opaque_default'} + 'network': 'nopaque_default'} container_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest') try: container = docker_client.containers.get(container_args['name']) diff --git a/daemon/tasks/check_jobs.py b/daemon/tasks/check_jobs.py index 09dd24ac070e5ac5a72b832996a9588e298058ae..e779bbff6f5f2cc7756a7aa88ec22409e85dcc55 100644 --- a/daemon/tasks/check_jobs.py +++ b/daemon/tasks/check_jobs.py @@ -1,12 +1,14 @@ from datetime import datetime from logger.logger import init_logger from tasks import Session, docker_client, NOPAQUE_STORAGE +from tasks.decorators import background from tasks.Models import Job, NotificationData, NotificationEmailData, JobResult import docker import json import os +@background def check_jobs(): # logger = init_logger() cj_session = Session() diff --git a/daemon/tasks/decorators.py b/daemon/tasks/decorators.py new file mode 100644 index 0000000000000000000000000000000000000000..040250a87e17cc3a0a0bcf711a94f1ebc3d08b6d --- /dev/null +++ b/daemon/tasks/decorators.py @@ -0,0 +1,14 @@ +from functools import wraps +from threading import Thread + + +def background(f): + ''' + ' This decorator executes a function in a Thread. + ''' + @wraps(f) + def wrapped(*args, **kwargs): + thread = Thread(target=f, args=args, kwargs=kwargs) + thread.start() + return thread + return wrapped diff --git a/daemon/tasks/notify.py b/daemon/tasks/notify.py index 4d85be4f1d008cbd37689c9623bff46349b75f17..a0ff75d41b3d934fce25538c0cf6c7146db03cdf 100644 --- a/daemon/tasks/notify.py +++ b/daemon/tasks/notify.py @@ -2,10 +2,12 @@ from notify.notification import Notification from notify.service import NotificationService from sqlalchemy import asc from tasks import Session +from tasks.decorators import background from tasks.Models import NotificationEmailData import os +@background def notify(execute_flag): # If True mails are sent normaly # If False mails are not sent. Used to avoid sending mails for jobs that