From 44e9dc2f682e4f7e28f67236d007de8495e572bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20G=C3=B6bel?= <dgoebel@techfak.uni-bielefeld.de> Date: Wed, 5 Apr 2023 18:25:15 +0200 Subject: [PATCH] Store the params file as json in the params bucket #34 --- README.md | 2 +- app/api/endpoints/workflow_execution.py | 39 ++++++++++++++++++- app/api/utils.py | 41 ++------------------ app/core/config.py | 2 +- app/slurm/slurm_rest_client.py | 11 ++++-- app/tests/api/test_workflow_execution.py | 40 ++++++++++++++++--- mako_templates/workflow-params.yaml.template | 9 ----- 7 files changed, 85 insertions(+), 59 deletions(-) delete mode 100644 mako_templates/workflow-params.yaml.template diff --git a/README.md b/README.md index e0691f0..875dee6 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ This is the Workflow service of the CloWM service. | `API_PREFIX` | `/api/workflow-service` | URL path | Prefix before every URL path | | `BACKEND_CORS_ORIGINS` | `[]` | json formatted list of urls | List of valid CORS origins | | `SQLALCHEMY_VERBOSE_LOGGER` | `False` | `<"True"|"False">` | Enables verbose SQL output.<br>Should be `false` in production | -| `NXF_CONFIG_BUCKET` | `nxf-config` | Bucket Name | Bucket where the nextflow configurations for each execution should be saved | +| `PARAMS_BUCKET` | `nxf-params` | Bucket Name | Bucket where the nextflow configurations for each execution should be saved | | `WORKFLOW_BUCKET` | `clowm-workflows` | Bucket Name | Bucket where to save important workflow files | | `ICON_BUCKET` | `clowm-icons` | Bucket name | Bucket where to save workflow icons. Should be publicly available. | | `SLURM_USER` | `slurm` | string | User on the slurm cluster who should run the job. Should be the user of the `SLURM_TOKEN` | diff --git a/app/api/endpoints/workflow_execution.py b/app/api/endpoints/workflow_execution.py index 522aa3b..677b08d 100644 --- a/app/api/endpoints/workflow_execution.py +++ b/app/api/endpoints/workflow_execution.py @@ -320,6 +320,43 @@ async def get_workflow_execution( ) +@router.get("/{eid}/params", status_code=status.HTTP_200_OK, summary="Get the parameters of a workflow execution") +async def get_workflow_execution_params( + workflow_execution: CurrentWorkflowExecution, + current_user: CurrentUser, + authorization: Authorization, + s3: S3Service, +) -> dict[str, Any]: + """ + Get the parameters of a specific workflow execution.\n + Permission "workflow_execution:read" required if the current user started the workflow execution, + otherwise "workflow_execution:read_any" required. + \f + Parameters + ---------- + workflow_execution : clowmdb.models.WorkflowExecution + Workflow execution with given ID. Dependency Injection. + current_user : clowmdb.models.User + Current user who will be the owner of the newly created bucket. Dependency Injection. + authorization : Callable[[str], Awaitable[Any]] + Async function to ask the auth service for authorization. Dependency Injection. + s3 : boto3_type_annotations.s3.ServiceResource + S3 Service to perform operations on buckets in Ceph. Dependency Injection. + + Returns + ------- + execution : clowmdb.models.WorkflowExecution + Workflow execution with the given id. + """ + rbac_operation = "read" if workflow_execution.user_id == current_user.uid else "read_any" + await authorization(rbac_operation) + params_file_name = f"params-{workflow_execution.execution_id.hex}.json" + with SpooledTemporaryFile(max_size=512000) as f: + s3.Bucket(name=settings.PARAMS_BUCKET).Object(key=params_file_name).download_fileobj(f) + f.seek(0) + return json.load(f) + + @router.delete("/{eid}", status_code=status.HTTP_204_NO_CONTENT, summary="Delete a workflow execution") async def delete_workflow_execution( background_tasks: BackgroundTasks, @@ -360,7 +397,7 @@ async def delete_workflow_execution( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete workflow execution that is not finished." ) background_tasks.add_task( - s3.Bucket(name=settings.NXF_CONFIG_BUCKET).Object(key=f"{workflow_execution.execution_id.hex}.config").delete + s3.Bucket(name=settings.PARAMS_BUCKET).Object(key=f"params-{workflow_execution.execution_id.hex}.json").delete ) await CRUDWorkflowExecution.delete(db, workflow_execution.execution_id) diff --git a/app/api/utils.py b/app/api/utils.py index 40ecef5..25f0eb8 100644 --- a/app/api/utils.py +++ b/app/api/utils.py @@ -1,13 +1,12 @@ import json import re from tempfile import SpooledTemporaryFile -from typing import TYPE_CHECKING, Any, Callable +from typing import TYPE_CHECKING, Any from uuid import uuid4 from clowmdb.models import WorkflowExecution from fastapi import BackgroundTasks, HTTPException, UploadFile, status from httpx import AsyncClient, ConnectError, ConnectTimeout -from mako.runtime import Context from mako.template import Template from sqlalchemy.ext.asyncio import AsyncSession @@ -21,7 +20,6 @@ if TYPE_CHECKING: else: S3ServiceResource = object -parameter_file_template = Template(filename="mako_templates/workflow-params.yaml.template") nextflow_command_template = Template(filename="mako_templates/nextflow_command.template") # regex to find S3 files in parameters of workflow execution s3_file_regex = re.compile( @@ -73,33 +71,6 @@ async def check_repo(repo: GitRepository, client: AsyncClient) -> None: ) -class BinaryWriterContext(Context): - def __init__(self, buffer, **data): # type: ignore[no-untyped-def] - super().__init__(buffer, **data) - self._file = buffer - - def write(self, string: str) -> None: - """ - Write a string to the binary output stream. - - Parameters - ---------- - string : str - String to write - """ - self._file.write(string.encode("utf-8")) - - def writer(self) -> Callable[[str], None]: - """ - Get the write function. - - Returns - ------- - write : Callable[[str], None] - """ - return self.write - - async def start_workflow_execution( s3: S3ServiceResource, db: AsyncSession, @@ -129,15 +100,11 @@ async def start_workflow_execution( slurm_client : app.slurm.slurm_rest_client.SlurmClient Slurm Rest Client to communicate with Slurm cluster. """ - params_file_name = f"params-{execution.execution_id.hex}.yaml" + params_file_name = f"params-{execution.execution_id.hex}.json" with SpooledTemporaryFile(max_size=512000) as f: - ctx = BinaryWriterContext( - f, - parameters=parameters, - ) - parameter_file_template.render_context(ctx) + f.write(json.dumps(parameters).encode("utf-8")) f.seek(0) - s3.Bucket(name=settings.NXF_CONFIG_BUCKET).Object(key=params_file_name).upload_fileobj(f) + s3.Bucket(name=settings.PARAMS_BUCKET).Object(key=params_file_name).upload_fileobj(f) nextflow_command = nextflow_command_template.render( repo=git_repo, diff --git a/app/core/config.py b/app/core/config.py index fa8794f..4282d1f 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -75,7 +75,7 @@ class Settings(BaseSettings): CEPH_ACCESS_KEY: str = Field(..., description="Access key for the Ceph Object Gateway with admin privileges.") CEPH_SECRET_KEY: str = Field(..., description="Secret key for the Ceph Object Gateway with admin privileges.") AUTHZ_ENDPOINT: AnyHttpUrl = Field(..., description="Endpoint of the CloWM Auth service to authorize requests") - NXF_CONFIG_BUCKET: str = Field("nxf-config", description="Bucket where the nextflow configurations should be saved") + PARAMS_BUCKET: str = Field("nxf-params", description="Bucket where the nextflow configurations should be saved") WORKFLOW_BUCKET: str = Field("clowm-workflows", description="Bucket where to save workflow files") ICON_BUCKET: str = Field("clowm-icons", description="Bucket where to save workflow icons") SLURM_ENDPOINT: AnyHttpUrl = Field(..., description="URI of the Slurm Cluster") diff --git a/app/slurm/slurm_rest_client.py b/app/slurm/slurm_rest_client.py index a3dfaf0..be61d9d 100644 --- a/app/slurm/slurm_rest_client.py +++ b/app/slurm/slurm_rest_client.py @@ -37,16 +37,19 @@ class SlurmClient: slurm_job_id : int Slurm job ID of submitted job. """ + env = { + "NXF_HOME": f"/home/{settings.SLURM_USER}/.nextflow", + "NXF_WORK": settings.SLURM_SHARED_DIR, + "NXF_ASSETS": settings.SLURM_SHARED_DIR, + "TOWER_WORKSPACE_ID": execution_id.hex[:16], + } body = { "script": nextflow_script, "job": { "current_working_directory": settings.SLURM_SHARED_DIR, "name": str(execution_id), "requeue": False, - "environment": { - "NXF_HOME": f"/home/{settings.SLURM_USER}/.nextflow", - "TOWER_WORKSPACE_ID": execution_id.hex[:16], - }, + "environment": env, }, } response = await self._client.post( diff --git a/app/tests/api/test_workflow_execution.py b/app/tests/api/test_workflow_execution.py index c6d5163..a186dec 100644 --- a/app/tests/api/test_workflow_execution.py +++ b/app/tests/api/test_workflow_execution.py @@ -52,8 +52,8 @@ class TestWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes): assert execution_response["workflow_version_id"] == random_workflow_version.git_commit_hash assert ( - f"params-{UUID(hex=execution_response['execution_id']).hex }.yaml" - in mock_s3_service.Bucket(settings.NXF_CONFIG_BUCKET).objects.all_keys() + f"params-{UUID(hex=execution_response['execution_id']).hex }.json" + in mock_s3_service.Bucket(settings.PARAMS_BUCKET).objects.all_keys() ) @pytest.mark.asyncio @@ -97,8 +97,8 @@ class TestWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes): assert execution_response["workflow_version_id"] == random_workflow_version.git_commit_hash assert ( - f"params-{UUID(hex=execution_response['execution_id']).hex }.yaml" - in mock_s3_service.Bucket(settings.NXF_CONFIG_BUCKET).objects.all_keys() + f"params-{UUID(hex=execution_response['execution_id']).hex }.json" + in mock_s3_service.Bucket(settings.PARAMS_BUCKET).objects.all_keys() ) @pytest.mark.asyncio @@ -400,8 +400,8 @@ class TestDevWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes): assert execution_response["status"] == WorkflowExecution.WorkflowExecutionStatus.PENDING assert ( - f"params-{UUID(hex=execution_response['execution_id']).hex }.yaml" - in mock_s3_service.Bucket(settings.NXF_CONFIG_BUCKET).objects.all_keys() + f"params-{UUID(hex=execution_response['execution_id']).hex }.json" + in mock_s3_service.Bucket(settings.PARAMS_BUCKET).objects.all_keys() ) @pytest.mark.asyncio @@ -460,6 +460,34 @@ class TestWorkflowExecutionRoutesGet(_TestWorkflowExecutionRoutes): execution = response.json() assert execution["execution_id"] == str(random_workflow_execution.execution_id) + @pytest.mark.asyncio + async def test_get_workflow_execution_params( + self, + client: AsyncClient, + random_user: UserWithAuthHeader, + random_workflow_execution: WorkflowExecution, + ) -> None: + """ + Test for getting the parameters of a workflow execution. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. pytest fixture. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. pytest fixture. + random_workflow_execution : clowmdb.models.WorkflowExecution + Random workflow execution for testing. pytest fixture. + """ + response = await client.get( + "/".join([self.base_path, str(random_workflow_execution.execution_id), "params"]), + headers=random_user.auth_headers, + ) + assert response.status_code == status.HTTP_200_OK + + execution = response.json() + assert len(execution) == 0 + @pytest.mark.asyncio async def test_get_non_existing_workflow_execution( self, diff --git a/mako_templates/workflow-params.yaml.template b/mako_templates/workflow-params.yaml.template deleted file mode 100644 index a5e3805..0000000 --- a/mako_templates/workflow-params.yaml.template +++ /dev/null @@ -1,9 +0,0 @@ -% for param, val in parameters.items(): -% if isinstance(val, str): -${param}: "${val}" -% elif isinstance(val, bool): -${param}: ${str(val).lower()} -% else: -${param}: ${val} -%endif -% endfor -- GitLab