Skip to content
Snippets Groups Projects
swarm.py 4.50 KiB
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import docker
import time
import json
import os


class Swarm:
    def __init__(self, app=None):
        self.app = app
        if app is not None:
            self.init_app(app)
        self.docker = docker.from_env()

    def init_app(self, app):
        engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI'])
        self.Session = sessionmaker(bind=engine)

    '''
    ' Swarm mode is intendet to run containers which serve a non terminating
    ' service like a webserver. For processing an occuring job it is necessary
    ' to use a one-shot container, which stops after the wrapped job process is
    ' completly executed. In order to run these one-shot containers in Swarm
    ' mode, the following run method is implemented analog to the presented
    ' implementation in Alex Ellis' blog post "One-shot containers on Docker
    ' Swarm"¹.
    '
    ' ¹ https://blog.alexellis.io/containers-on-swarm/
    '''

    def run(self, job):
        '''
        Input is a job object. From this the _command is built.
        '''
        # Prepare argument values needed for the service creation.
        service_args = json.loads(job.service_args)
        ressources = json.loads(job.ressources)
        _command = (job.service
                    + ' -i /files'
                    + ' -l {}'.format(service_args['lang'])
                    + ' -o /files/output'
                    + ' ' + ' '.join(service_args['args']))
        _constraints = ['node.role==worker']
        _image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(
            job.service,
            service_args['version']
        )
        _labels = {'service': job.service}
        _mounts = [os.path.join('/home/compute/mnt/opaque',
                                str(job.user_id),
                                'jobs',
                                str(job.id))
                   + ':/files:rw']
        _name = job.id
        '''
        ' The Docker SDK for Python expects the cpu_reservation value to be
        ' scaled to nanos (10^9). Because the job object contains unscaled
        ' (10^0) values, it must be conveted.
        '
        ' While the cpu_reservation value has to be in nanos, the
        ' mem_reservation value must be presented in an unscaled form
        ' (intuitive right?). Bacause the job object provides the memory value
        ' in megabytes, it is also necessary to convert the value.
        '''
        _resources = docker.types.Resources(
            cpu_reservation=ressources['n_cores'] * (10 ** 9),
            mem_reservation=ressources['mem_mb'] * (10 ** 6)
        )
        _restart_policy = docker.types.RestartPolicy(condition='none')
        '''
        ' Create the service with the prepared values.
        '
        ' Note: A service reserves hardware ressources. In case no worker node
        '       has the required ressources available (not reserved), the
        '       service gets queued by the Docker engine until a node is able
        '       to meet the requirements.
        '
        ' TODO: The name argument should be used with the prepared value
        '       (name=_name). Because there is no id generator for now, it is
        '       not set, so that the Docker engine assigns a random name.
        '''
        service = self.docker.services.create(
            _image,
            command=_command,
            constraints=_constraints,
            labels=_labels,
            mounts=_mounts,
            resources=_resources,
            restart_policy=_restart_policy
        )
        '''
        ' Because it takes some time until all data in the service object is
        ' initialized (especcially the task list returned by the service.tasks
        ' method), a poll loop checks if the task list is empty.
        '''
        while not service.tasks():
            time.sleep(1)
            service.reload()
        '''
        ' Poll the service until the job is completly executed.
        '''
        session = self.Session()
        while True:
            current_state = service.tasks()[0].get('Status').get('State')
            if job.status != current_state:
                job.status = current_state
                session.add(job)
                session.commit()
                print(current_state)
            if current_state == 'complete':
                break
            time.sleep(1)
            service.reload()
        session.close()
        # Remove the service from the swarm.
        service.remove()

        return