From 6f5bf674c60a157387a75303278d91858bb1115a Mon Sep 17 00:00:00 2001
From: Stephan Porada <sporada@uni-bielefeld.de>
Date: Tue, 6 Aug 2019 14:27:41 +0200
Subject: [PATCH] Handle job creation with the new Job class.

---
 app/services/views.py | 72 ++++++++++++++++++++++---------------------
 app/swarm.py          | 28 ++++++++++-------
 2 files changed, 54 insertions(+), 46 deletions(-)

diff --git a/app/services/views.py b/app/services/views.py
index c580bb32..c3be8ac5 100644
--- a/app/services/views.py
+++ b/app/services/views.py
@@ -3,10 +3,13 @@ from flask import current_app, flash, redirect, render_template, url_for
 from . import services
 from flask_login import current_user, login_required
 from .forms import NewOCRJobForm, NewNLPJobForm
+from ..models import Job
 from ..import swarm
+from .. import db
 from threading import Thread
 import hashlib
 import os
+import json
 
 
 @services.route('/ocr', methods=['GET', 'POST'])
@@ -15,24 +18,23 @@ def ocr():
     new_ocr_job_form = NewOCRJobForm()
     if new_ocr_job_form.validate_on_submit():
         app = current_app._get_current_object()
-        id = hashlib.md5(
-            (current_user.username + '_' + datetime.now().isoformat()).encode()
-        ).hexdigest()
-        '''
-        ' TODO: Implement a Job class. For now a dictionary representation
-        '       is enough.
-        '''
-        job = {'creator': current_user.id,
-               'id': id,
-               'requested_cpus': 2,
-               'requested_memory': 2048,
-               'service': 'ocr',
-               'service_args': {'lang': new_ocr_job_form.language.data,
-                                'version': new_ocr_job_form.version.data
-                                },
-               'status': 'queued'
-               }
-        dir = os.path.join(app.config['OPAQUE_STORAGE'], 'jobs', id)
+        ocr_job = Job()
+        ocr_job.title = new_ocr_job_form.title.data
+        ocr_job.description = new_ocr_job_form.description.data
+        ocr_job.user_id = current_user.id
+        ocr_job.creation_date = datetime.utcnow()
+        ocr_job.service = "ocr"
+        ocr_job.ressources = json.dumps({"n_cores": 2,
+                                         "mem_mb": 4096})
+        ocr_job.service_args = json.dumps({"args": ["--keep-intermediates",
+                                                    "--skip-binarisation"],
+                                           "lang": new_ocr_job_form.language.data,
+                                           "version": new_ocr_job_form.version.data})
+        ocr_job.status = "queued"
+        db.session.add(ocr_job)
+        db.session.commit()
+
+        dir = os.path.join(app.config['OPAQUE_STORAGE'], 'jobs', str(ocr_job.id))
 
         try:
             os.makedirs(dir)
@@ -47,7 +49,7 @@ def ocr():
             ' NOTE: Using self created threads is just for testing purpose as
             '       there is no scheduler available.
             '''
-            thread = Thread(target=swarm.run, args=(job,))
+            thread = Thread(target=swarm.run, args=(ocr_job,))
             thread.start()
             flash('Job created!')
         return redirect(url_for('services.ocr'))
@@ -68,21 +70,21 @@ def nlp():
         id = hashlib.md5(
             (current_user.username + '_' + datetime.now().isoformat()).encode()
         ).hexdigest()
-        '''
-        ' TODO: Implement a Job class. For now a dictionary representation
-        '       is enough.
-        '''
-        job = {'creator': current_user.id,
-               'id': id,
-               'requested_cpus': 2,
-               'requested_memory': 2048,
-               'service': 'nlp',
-               'service_args': {'lang': new_nlp_job_form.language.data,
-                                'version': new_nlp_job_form.version.data
-                                },
-               'status': 'queued'
-               }
-        dir = os.path.join(app.config['OPAQUE_STORAGE'], 'jobs', id)
+        nlp_job = Job()
+        nlp_job.title = new_nlp_job_form.title.data
+        nlp_job.description = new_nlp_job_form.description.data
+        nlp_job.user_id = current_user.id
+        nlp_job.creation_date = datetime.utcnow()
+        nlp_job.service = "nlp"
+        nlp_job.ressources = json.dumps({"n_cores": 2,
+                                         "mem_mb": 4096})
+        nlp_job.service_args = json.dumps({"args": [],
+                                           "lang": new_nlp_job_form.language.data,
+                                           "version": new_nlp_job_form.version.data})
+        nlp_job.status = "queued"
+        db.session.add(nlp_job)
+        db.session.commit()
+        dir = os.path.join(app.config['OPAQUE_STORAGE'], 'jobs', str(nlp_job.id))
 
         try:
             os.makedirs(dir)
@@ -97,7 +99,7 @@ def nlp():
             ' NOTE: Using self created threads is just for testing purpose as
             '       there is no scheduler available.
             '''
-            thread = Thread(target=swarm.run, args=(job,))
+            thread = Thread(target=swarm.run, args=(nlp_job,))
             thread.start()
             flash('Job created!')
         return redirect(url_for('services.nlp'))
diff --git a/app/swarm.py b/app/swarm.py
index a5a9e540..859a9f91 100644
--- a/app/swarm.py
+++ b/app/swarm.py
@@ -1,5 +1,6 @@
 import docker
 import time
+import json
 
 
 class Swarm:
@@ -19,22 +20,27 @@ class Swarm:
     '''
 
     def run(self, job):
+        '''
+        Input is a job object. From this the _command is built.
+        '''
         # Prepare argument values needed for the service creation.
-        _command = job['service'] \
-                  + ' -i /files/{}'.format(job['id']) \
-                  + ' -l {}'.format(job['service_args']['lang']) \
-                  + ' -o /files/{}/output'.format(job['id'])
-                  # + ' --keep-intermediates'
+        service_args = json.loads(job.service_args)
+        ressources = json.loads(job.ressources)
+        _command = (job.service
+                    + ' -i /files/{}'.format(job.id)
+                    + ' -l {}'.format(service_args['lang'])
+                    + ' -o /files/{}/output'.format(job.id)
+                    + ' ' + ' '.join(service_args['args']))
         _constraints = ['node.role==worker']
         _image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(
-            job['service'],
-            job['service_args']['version']
+            job.service,
+            service_args['version']
         )
-        _labels = {'service': job['service']}
+        _labels = {'service': job.service}
         _mounts = [
             '/home/compute/mnt/opaque/jobs:/files:rw',
         ]
-        _name = job['id']
+        _name = job.id
         '''
         ' The Docker SDK for Python expects the cpu_reservation value to be
         ' scaled to nanos (10^9). Because the job object contains unscaled
@@ -46,8 +52,8 @@ class Swarm:
         ' in megabytes, it is also necessary to convert the value.
         '''
         _resources = docker.types.Resources(
-            cpu_reservation=job['requested_cpus'] * (10 ** 9),
-            mem_reservation=job['requested_memory'] * (10 ** 6)
+            cpu_reservation=ressources['n_cores'] * (10 ** 9),
+            mem_reservation=ressources['mem_mb'] * (10 ** 6)
         )
         _restart_policy = docker.types.RestartPolicy(condition='none')
         '''
-- 
GitLab