Skip to content
Snippets Groups Projects
Commit b56a8362 authored by Stephan Porada's avatar Stephan Porada :speech_balloon:
Browse files

Avoid that threads of the same type can be executed twice or thrice etc.

parent 9e883304
No related branches found
No related tags found
No related merge requests found
...@@ -28,7 +28,7 @@ WORKDIR /home/nopaqued ...@@ -28,7 +28,7 @@ WORKDIR /home/nopaqued
COPY ["logger", "logger"] COPY ["logger", "logger"]
COPY ["notify", "notify"] COPY ["notify", "notify"]
COPY ["tasks", "tasks"] COPY ["tasks", "tasks"]
COPY ["decorators.py", "nopaqued.py", "requirements.txt", "./"] COPY ["nopaqued.py", "requirements.txt", "./"]
RUN python -m venv venv \ RUN python -m venv venv \
&& venv/bin/pip install --requirement requirements.txt \ && venv/bin/pip install --requirement requirements.txt \
&& mkdir logs && mkdir logs
......
from functools import wraps
from threading import Thread
def background(f):
'''
' This decorator executes a function in a Thread.
'''
@wraps(f)
def wrapped(*args, **kwargs):
thread = Thread(target=f, args=args, kwargs=kwargs)
thread.start()
return thread
return wrapped
...@@ -23,4 +23,4 @@ def init_logger(): ...@@ -23,4 +23,4 @@ def init_logger():
if __name__ == '__main__': if __name__ == '__main__':
init_logger() init_logger()
\ No newline at end of file
from tasks.check_jobs import check_jobs from concurrent.futures import ThreadPoolExecutor
from tasks.check_corpora import check_corpora from tasks.check_corpora import check_corpora
from tasks.check_jobs import check_jobs
from tasks.notify import notify from tasks.notify import notify
from time import sleep from time import sleep
# TODO: Check if thread is still alive and execute next thread after that # TODO: Check if thread is still alive and execute next thread after that
# TODO: Remove unnecessary commits
# TODO: Check line length # TODO: Check line length
# check_jobs_thread = None
# check_corpora_thread = None
# notify_thread = None
#
#
# def nopaqued():
# # executing background functions
# while True:
# check_jobs_thread = check_jobs()
# check_corpora_thread = check_corpora()
# notify_thread = notify(True) # If True mails are sent. If False no mails are sent.
# # But notification status will be set nonetheless.
# sleep(3)
def nopaqued(): def nopaqued():
# executing background functions # executing background functions
while True: while True:
check_jobs() with ThreadPoolExecutor(max_workers=3) as executor:
check_corpora() executor.submit(check_jobs)
notify(True) # If True mails are sent. If False no mails are sent. executor.submit(check_corpora)
# But notification status will be set nonetheless. executor.submit(notify, True) # If True mails are sent.
# If False no mails are sent.
# But notification status will be set nonetheless.
sleep(3) sleep(3)
......
...@@ -24,4 +24,4 @@ class Notification(EmailMessage): ...@@ -24,4 +24,4 @@ class Notification(EmailMessage):
def set_addresses(self, sender, recipient): def set_addresses(self, sender, recipient):
self['From'] = sender self['From'] = sender
self['to'] = recipient self['to'] = recipient
\ No newline at end of file
...@@ -8,10 +8,10 @@ class NotificationService(object): ...@@ -8,10 +8,10 @@ class NotificationService(object):
def __init__(self, execute_flag): def __init__(self, execute_flag):
super(NotificationService, self).__init__() super(NotificationService, self).__init__()
self.execute_flag = execute_flag # If True mails are sent normaly self.execute_flag = execute_flag # If True mails are sent normaly
# If False mails are not sent. Used to avoid sending mails for jobs that # If False mails are not sent. Used to avoid sending mails for jobs
# have been completed a long time ago. Use this if you implement notify # that have been completed a long time ago. Use this if you implement
# into an already existing nopaque instance. Change it to True after the # notify into an already existing nopaque instance. Change it to True
# daemon has run one time with the flag set to False # after the daemon has run one time with the flag set to False
self.not_sent = {} # Holds due to an error unsent email notifications self.not_sent = {} # Holds due to an error unsent email notifications
self.mail_limit_exceeded = False # Bool to show if the mail server self.mail_limit_exceeded = False # Bool to show if the mail server
# stoped sending mails due to exceeding its sending limit # stoped sending mails due to exceeding its sending limit
...@@ -38,4 +38,4 @@ class NotificationService(object): ...@@ -38,4 +38,4 @@ class NotificationService(object):
return return
def quit(self): def quit(self):
self.smtp_server.quit() self.smtp_server.quit()
\ No newline at end of file
from decorators import background
from logger.logger import init_logger from logger.logger import init_logger
from tasks import Session, docker_client, NOPAQUE_STORAGE from tasks import Session, docker_client, NOPAQUE_STORAGE
from tasks.Models import Corpus from tasks.Models import Corpus
...@@ -7,7 +6,6 @@ import os ...@@ -7,7 +6,6 @@ import os
import shutil import shutil
@background
def check_corpora(): def check_corpora():
c_session = Session() c_session = Session()
corpora = c_session.query(Corpus).all() corpora = c_session.query(Corpus).all()
...@@ -131,4 +129,4 @@ def __remove_cqpserver_container(corpus): ...@@ -131,4 +129,4 @@ def __remove_cqpserver_container(corpus):
return return
else: else:
container.remove(force=True) container.remove(force=True)
corpus.status = 'prepared' corpus.status = 'prepared'
\ No newline at end of file
from datetime import datetime from datetime import datetime
from decorators import background
from logger.logger import init_logger from logger.logger import init_logger
from tasks import Session, docker_client, NOPAQUE_STORAGE from tasks import Session, docker_client, NOPAQUE_STORAGE
from tasks.Models import Job, NotificationData, NotificationEmailData, JobResult from tasks.Models import Job, NotificationData, NotificationEmailData, JobResult
...@@ -8,7 +7,6 @@ import json ...@@ -8,7 +7,6 @@ import json
import os import os
@background
def check_jobs(): def check_jobs():
# logger = init_logger() # logger = init_logger()
cj_session = Session() cj_session = Session()
...@@ -49,7 +47,8 @@ def __add_notification_data(job, notified_on_status, scoped_session): ...@@ -49,7 +47,8 @@ def __add_notification_data(job, notified_on_status, scoped_session):
if (notification_exists == 0): if (notification_exists == 0):
notification_data = NotificationData(job_id=job.id) notification_data = NotificationData(job_id=job.id)
scoped_session.add(notification_data) scoped_session.add(notification_data)
scoped_session.commit() # If no commit job will have no NotificationData scoped_session.commit()
# If no commit job will have no NotificationData
# logger.warning('Created NotificationData for current Job.')) # logger.warning('Created NotificationData for current Job.'))
else: else:
pass pass
...@@ -70,10 +69,14 @@ def __create_job_service(job): ...@@ -70,10 +69,14 @@ def __create_job_service(job):
job_dir = os.path.join(NOPAQUE_STORAGE, str(job.user_id), 'jobs', job_dir = os.path.join(NOPAQUE_STORAGE, str(job.user_id), 'jobs',
str(job.id)) str(job.id))
service_args = {'command': ('{} /files /files/output'.format(job.service) service_args = {'command': ('{} /files /files/output'.format(job.service)
+ ' {}'.format(job.secure_filename if job.service == 'file-setup' else '') + ' {}'.format(job.secure_filename
if job.service == 'file-setup'
else '')
+ ' --log-dir /files' + ' --log-dir /files'
+ ' --zip [{}]_{}'.format(job.service, job.secure_filename) + ' --zip [{}]_{}'.format(job.service,
+ ' ' + ' '.join(json.loads(job.service_args))), job.secure_filename)
+ ' ' + ' '.join(json.loads(job.service_args))
),
'constraints': ['node.role==worker'], 'constraints': ['node.role==worker'],
'labels': {'origin': 'nopaque', 'labels': {'origin': 'nopaque',
'type': 'service.{}'.format(job.service), 'type': 'service.{}'.format(job.service),
...@@ -147,4 +150,4 @@ def __remove_job_service(job): ...@@ -147,4 +150,4 @@ def __remove_job_service(job):
return return
else: else:
service.update(mounts=None) service.update(mounts=None)
service.remove() service.remove()
\ No newline at end of file
from decorators import background
from notify.notification import Notification from notify.notification import Notification
from notify.service import NotificationService from notify.service import NotificationService
from sqlalchemy import asc from sqlalchemy import asc
...@@ -7,6 +6,36 @@ from tasks.Models import NotificationEmailData ...@@ -7,6 +6,36 @@ from tasks.Models import NotificationEmailData
import os import os
def notify(execute_flag):
# If True mails are sent normaly
# If False mails are not sent. Used to avoid sending mails for jobs that
# have been completed a long time ago. Use this if you implement notify
# into an already existing nopaque instance. Change it to True after the
# daemon has run one time with the flag set to False.
# Initialize notification service
notification_service = NotificationService(execute_flag)
notification_service.get_smtp_configs()
notification_service.set_server()
# create notifications (content, recipient etc.)
notifications = __create_mail_notifications(notification_service)
# only login and send mails if there are any notifications
if (len(notifications) > 0):
try:
notification_service.login()
# combine new and unsent notifications
notifications.update(notification_service.not_sent)
# send all notifications
__send_mail_notifications(notifications, notification_service)
# remove unsent notifications because they have been sent now
# but only if mail limit has not been exceeded
if (notification_service.mail_limit_exceeded is not True):
notification_service.not_sent = {}
notification_service.quit()
except Exception as e:
notification_service.not_sent.update(notifications)
notification_service.quit()
# Email notification functions # Email notification functions
def __create_mail_notifications(notification_service): def __create_mail_notifications(notification_service):
mn_session = Session() mn_session = Session()
...@@ -28,17 +57,19 @@ def __create_mail_notifications(notification_service): ...@@ -28,17 +57,19 @@ def __create_mail_notifications(notification_service):
'status': data.notify_status, 'status': data.notify_status,
'time': data.creation_date, 'time': data.creation_date,
'url': url} 'url': url}
txt_tmplt = 'notify/templates/notification_messages/notification.txt'
html_tmplt = 'notify/templates/notification_messages/notification.html'
notification.set_notification_content(subject_template, notification.set_notification_content(subject_template,
subject_template_values_dict, subject_template_values_dict,
'notify/templates/notification_messages/notification.txt', txt_tmplt,
'notify/templates/notification_messages/notification.html', html_tmplt,
body_template_values_dict) body_template_values_dict)
notifications[data.job.id] = notification notifications[data.job.id] = notification
# Using a dictionary for notifications avoids sending multiple mails # Using a dictionary for notifications avoids sending multiple mails
# if the status of a job changes in a few seconds. The user will not get # if the status of a job changes in a few seconds. The user will not
# swamped with mails for queued, running and complete if those happen in # get swamped with mails for queued, running and complete if those
# in a few seconds. Only the last update will be sent. This depends on # happen in in a few seconds. Only the last update will be sent.
# the sleep time interval though. # This depends on the sleep time interval though.
mn_session.delete(data) mn_session.delete(data)
mn_session.commit() mn_session.commit()
Session.remove() Session.remove()
...@@ -55,33 +86,3 @@ def __send_mail_notifications(notifications, notification_service): ...@@ -55,33 +86,3 @@ def __send_mail_notifications(notifications, notification_service):
# consecutive mail sending # consecutive mail sending
notification_service.not_sent[key] = notification notification_service.not_sent[key] = notification
notification_service.mail_limit_exceeded = True notification_service.mail_limit_exceeded = True
@background
def notify(execute_flag):
# If True mails are sent normaly
# If False mails are not sent. Used to avoid sending mails for jobs that
# have been completed a long time ago. Use this if you implement notify
# into an already existing nopaque instance. Change it to True after the
# daemon has run one time with the flag set to False.
# Initialize notification service
notification_service = NotificationService(execute_flag)
notification_service.get_smtp_configs()
notification_service.set_server()
# create notifications (content, recipient etc.)
notifications = __create_mail_notifications(notification_service)
# only login and send mails if there are any notifications
if (len(notifications) > 0):
try:
notification_service.login()
# combine new and unsent notifications
notifications.update(notification_service.not_sent)
# send all notifications
__send_mail_notifications(notifications, notification_service)
# remove unsent notifications because they have been sent now
# but only if mail limit has not been exceeded
if (notification_service.mail_limit_exceeded is not True):
notification_service.not_sent = {}
notification_service.quit()
except Exception as e:
notification_service.not_sent.update(notifications)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment