From 94e8b32fbd9184b4629d57f4bf007287e42bce1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20G=C3=B6bel?= <dgoebel@techfak.uni-bielefeld.de> Date: Fri, 24 Feb 2023 16:33:30 +0100 Subject: [PATCH] Add endpoint to start workflow execution form arbitrary git repository #13 --- .gitlab-ci.yml | 3 +- .pre-commit-config.yaml | 4 +- README.md | 25 ++-- app/api/endpoints/workflow_execution.py | 158 ++++++++++++++++------ app/api/utils.py | 127 ++++++++++++----- app/core/config.py | 1 + app/crud/crud_workflow_execution.py | 27 +++- app/git_repository/abstract_repository.py | 27 +++- app/main.py | 5 - app/schemas/workflow_execution.py | 36 ++++- app/tests/api/test_workflow_execution.py | 66 ++++++++- app/tests/crud/test_workflow_execution.py | 35 ++++- requirements-dev.txt | 2 +- scripts/test.sh | 9 -- 14 files changed, 406 insertions(+), 119 deletions(-) delete mode 100755 scripts/test.sh diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 7ec4979..7c5d929 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -13,6 +13,7 @@ variables: SLURM_TOKEN: "empty" SLURM_ENDPOINT: "http://127.0.0.1:8002" ACTIVE_WORKFLOW_EXECUTION_LIMIT: 3 + DEV_SYSTEM: "True" cache: paths: @@ -27,7 +28,7 @@ default: - pip install virtualenv - virtualenv venv - source venv/bin/activate - - python -m pip install --upgrade -r requirements.txt --extra-index-url https://$CI_PACKAGE_REGISTRY_USER:$CI_PACKAGE_REGISTRY_PASSWORD@gitlab.ub.uni-bielefeld.de/api/v4/projects/5493/packages/pypi/simple + - python -m pip install --upgrade -r requirements.txt - python -m pip install --upgrade -r requirements-dev.txt stages: # List of stages for jobs, and their order of execution diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 994593b..91c4f94 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -21,11 +21,11 @@ repos: files: app args: [--check] - repo: https://github.com/charliermarsh/ruff-pre-commit - rev: 'v0.0.247' + rev: 'v0.0.252' hooks: - id: ruff - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.0.0 + rev: v1.0.1 hooks: - id: mypy files: app diff --git a/README.md b/README.md index 5d53d8a..e0691f0 100644 --- a/README.md +++ b/README.md @@ -25,15 +25,16 @@ This is the Workflow service of the CloWM service. ### Optional Variables -| Variable | Default | Value | Description | -|------------------------------------|------------------------------|-----------------------------|-------------------------------------------------------------------------------------------| -| `API_PREFIX` | `/api/workflow-service` | URL path | Prefix before every URL path | -| `BACKEND_CORS_ORIGINS` | `[]` | json formatted list of urls | List of valid CORS origins | -| `SQLALCHEMY_VERBOSE_LOGGER` | `false` | `<"true"|"false">` | Enables verbose SQL output.<br>Should be `false` in production | -| `NXF_CONFIG_BUCKET` | `nxf-config` | Bucket Name | Bucket where the nextflow configurations for each execution should be saved | -| `WORKFLOW_BUCKET` | `clowm-workflows` | Bucket Name | Bucket where to save important workflow files | -| `ICON_BUCKET` | `clowm-icons` | Bucket name | Bucket where to save workflow icons. Should be publicly available. | -| `SLURM_USER` | `slurm` | string | User on the slurm cluster who should run the job. Should be the user of the `SLURM_TOKEN` | -| `SLURM_SHARED_DIR` | `/vol/spool/clowm-workflows` | Path on slurm cluster | Shared directory on the cluster where the nextflow jobs will be executed. | -| `CONFIG_BUCKET_MOUNT_PATH` | `/mnt/config-bucket` | Path on slurm cluster | Folder where the S3 bucket `NFX_CONFIG_BUCKET` will be mounted on the slurm cluster | -| `ACTIVE_WORKFLOW_EXECUTION_LIMIT` | `3` | Integer | Limit of active workflow execution a user is allowed to have. `-1` means infinite. | +| Variable | Default | Value | Description | +|-----------------------------------|------------------------------|-----------------------------|----------------------------------------------------------------------------------------------------------------------------------| +| `API_PREFIX` | `/api/workflow-service` | URL path | Prefix before every URL path | +| `BACKEND_CORS_ORIGINS` | `[]` | json formatted list of urls | List of valid CORS origins | +| `SQLALCHEMY_VERBOSE_LOGGER` | `False` | `<"True"|"False">` | Enables verbose SQL output.<br>Should be `false` in production | +| `NXF_CONFIG_BUCKET` | `nxf-config` | Bucket Name | Bucket where the nextflow configurations for each execution should be saved | +| `WORKFLOW_BUCKET` | `clowm-workflows` | Bucket Name | Bucket where to save important workflow files | +| `ICON_BUCKET` | `clowm-icons` | Bucket name | Bucket where to save workflow icons. Should be publicly available. | +| `SLURM_USER` | `slurm` | string | User on the slurm cluster who should run the job. Should be the user of the `SLURM_TOKEN` | +| `SLURM_SHARED_DIR` | `/vol/spool/clowm-workflows` | Path on slurm cluster | Shared directory on the cluster where the nextflow jobs will be executed. | +| `CONFIG_BUCKET_MOUNT_PATH` | `/mnt/config-bucket` | Path on slurm cluster | Folder where the S3 bucket `NFX_CONFIG_BUCKET` will be mounted on the slurm cluster | +| `ACTIVE_WORKFLOW_EXECUTION_LIMIT` | `3` | Integer | Limit of active workflow execution a user is allowed to have. `-1` means infinite. | +| `DEV_SYSTEM` | `False` | `<"True"|"False">` | Activates an endpoint that allows execution of an workflow from an arbitrary Git Repository.<br>HAS TO BE `False` in PRODUCTION! | diff --git a/app/api/endpoints/workflow_execution.py b/app/api/endpoints/workflow_execution.py index 0b122ee..379abe4 100644 --- a/app/api/endpoints/workflow_execution.py +++ b/app/api/endpoints/workflow_execution.py @@ -1,11 +1,11 @@ import json -import re from tempfile import SpooledTemporaryFile from typing import TYPE_CHECKING, Any, Awaitable, Callable import jsonschema from clowmdb.models import User, WorkflowExecution, WorkflowVersion from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, status +from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from app.api.dependencies import ( @@ -13,14 +13,20 @@ from app.api.dependencies import ( get_current_user, get_current_workflow_execution, get_db, + get_httpx_client, get_s3_resource, get_slurm_client, ) -from app.api.utils import check_bucket_access, start_workflow_execution +from app.api.utils import ( + check_active_workflow_execution_limit, + check_buckets_access, + check_repo, + start_workflow_execution, +) from app.core.config import settings from app.crud import CRUDWorkflowExecution, CRUDWorkflowVersion from app.git_repository import build_repository -from app.schemas.workflow_execution import WorkflowExecutionIn, WorkflowExecutionOut +from app.schemas.workflow_execution import DevWorkflowExecutionIn, WorkflowExecutionIn, WorkflowExecutionOut from app.slurm.slurm_rest_client import SlurmClient if TYPE_CHECKING: @@ -29,12 +35,7 @@ else: S3ServiceResource = object router = APIRouter(prefix="/workflow_executions", tags=["WorkflowExecution"]) -workflow_authorization = AuthorizationDependency(resource="workflow_execution_in") - -# regex to find S3 files in parameters of workflow execution -s3_file_regex = re.compile( - r"s3://(?!(((2(5[0-5]|[0-4]\d)|[01]?\d{1,2})\.){3}(2(5[0-5]|[0-4]\d)|[01]?\d{1,2})$))[a-z\d][a-z\d.-]{1,61}[a-z\d][^\"]*" -) +workflow_authorization = AuthorizationDependency(resource="workflow_execution") @router.post("", status_code=status.HTTP_201_CREATED, summary="Start a new workflow execution") @@ -91,21 +92,8 @@ async def start_workflow( ) rbac_operation = "start" if workflow_version.status == WorkflowVersion.Status.PUBLISHED else "start_unpublished" await authorization(rbac_operation) - # Check number of active workflow executions of the user - active_executions = await CRUDWorkflowExecution.list( - db, - uid=current_user.uid, - status_list=[ - WorkflowExecution.WorkflowExecutionStatus.PENDING, - WorkflowExecution.WorkflowExecutionStatus.SCHEDULED, - WorkflowExecution.WorkflowExecutionStatus.RUNNING, - ], - ) - if settings != -1 and len(active_executions) + 1 > settings.ACTIVE_WORKFLOW_EXECUTION_LIMIT: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail=f"The active workflow execution limit per user is {settings.ACTIVE_WORKFLOW_EXECUTION_LIMIT}", - ) + await check_active_workflow_execution_limit(db, current_user.uid) + # Validate schema with saved schema in bucket with SpooledTemporaryFile(max_size=512000) as f: s3.Bucket(settings.WORKFLOW_BUCKET).Object( @@ -120,23 +108,13 @@ async def start_workflow( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Nextflow Parameter validation error: {e.message}" ) - # Check if the user has access to the provided S3 buckets - errors = [] - parameters_string = json.dumps(workflow_execution_in.parameters) - for match in s3_file_regex.finditer(parameters_string): - path = match.group()[5:] - error = await check_bucket_access(db, current_user.uid, path) - if error is not None: - errors.append(error) - # Check if the user has access to the bucket where the report should be written - if workflow_execution_in.report_output_bucket is not None: - error = await check_bucket_access(db, current_user.uid, workflow_execution_in.report_output_bucket) - if error is not None: - errors.append(error) - if len(errors) > 0: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, detail=f"Errors with bucket access: {','.join(errors)}" - ) + # Create execution in database + await check_buckets_access( + db=db, + parameters=workflow_execution_in.parameters, + uid=current_user.uid, + report_bucket=workflow_execution_in.report_output_bucket, + ) # Create execution in database execution = await CRUDWorkflowExecution.create(db, execution=workflow_execution_in, owner_id=current_user.uid) @@ -156,6 +134,104 @@ async def start_workflow( return execution +@router.post( + "/arbitrary", + status_code=status.HTTP_201_CREATED, + summary="Start a workflow execution with arbitrary git repository", + include_in_schema=settings.DEV_SYSTEM, +) +async def start_arbitrary_workflow( + background_tasks: BackgroundTasks, + workflow_execution_in: DevWorkflowExecutionIn, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), + authorization: Callable[[str], Awaitable[Any]] = Depends(AuthorizationDependency(resource="workflow")), + s3: S3ServiceResource = Depends(get_s3_resource), + slurm_client: SlurmClient = Depends(get_slurm_client), + client: AsyncClient = Depends(get_httpx_client), +) -> WorkflowExecutionOut: + """ + Start a new workflow execution from an arbitrary git repository.\n + Permission "workflow:create" required. + \f + Parameters + ---------- + background_tasks : fastapi.BackgroundTasks + Entrypoint for new BackgroundTasks. Provided by FastAPI. + workflow_execution_in : app.schemas.workflow_executionWorkflowExecutionIn + Meta-data and parameters for the workflow to start. HTTP Body. + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. Dependency Injection. + current_user : clowmdb.models.User + Current user who will be the owner of the newly created bucket. Dependency Injection. + authorization : Callable[[str], Awaitable[Any]] + Async function to ask the auth service for authorization. Dependency Injection. + s3 : boto3_type_annotations.s3.ServiceResource + S3 Service to perform operations on buckets in Ceph. Dependency Injection. + slurm_client : app.slurm.slurm_rest_client.SlurmClient + Slurm Rest Client to communicate with Slurm cluster. Dependency Injection. + client : httpx.AsyncClient + Http client with an open connection. Dependency Injection. + + Returns + ------- + execution : clowmdb.models.WorkflowExecution + Created workflow execution from the database + """ + if not settings.DEV_SYSTEM: # pragma: no cover + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Not available") + await authorization("create") + await check_active_workflow_execution_limit(db, current_user.uid) + + try: + # Build a git repository object based on the repository url + repo = build_repository(workflow_execution_in.repository_url, workflow_execution_in.git_commit_hash) + except NotImplementedError: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Supplied Git Repository is not supported") + + await check_repo(repo=repo, client=client) + + # Validate schema with saved schema in bucket + with SpooledTemporaryFile(max_size=512000) as f: + await repo.download_file("nextflow.schema", client=client, file_handle=f) + f.seek(0) + nextflow_schema = json.load(f) + + try: + jsonschema.validate(workflow_execution_in.parameters, nextflow_schema) + except jsonschema.exceptions.ValidationError as e: # pragma: no cover + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail=f"Nextflow Parameter validation error: {e.message}" + ) + + # Create execution in database + await check_buckets_access( + db=db, + parameters=workflow_execution_in.parameters, + uid=current_user.uid, + report_bucket=workflow_execution_in.report_output_bucket, + ) + + execution = await CRUDWorkflowExecution.create( + db, + execution=workflow_execution_in, + owner_id=current_user.uid, + notes=f"Repository URL: {workflow_execution_in.repository_url}\nGit Commit Hash{workflow_execution_in.git_commit_hash}", # noqa: E501 + ) + background_tasks.add_task( + start_workflow_execution, + s3=s3, + db=db, + execution=execution, + report_output_bucket=workflow_execution_in.report_output_bucket, + parameters=workflow_execution_in.parameters, + git_repo=repo, + slurm_client=slurm_client, + ) + + return execution + + @router.get("", status_code=status.HTTP_200_OK, summary="Get all workflow executions") async def list_workflow_executions( user_id: str diff --git a/app/api/utils.py b/app/api/utils.py index 62a0f0b..9f4ef51 100644 --- a/app/api/utils.py +++ b/app/api/utils.py @@ -1,10 +1,11 @@ import json +import re from tempfile import SpooledTemporaryFile from typing import TYPE_CHECKING, Any, Callable from uuid import uuid4 from clowmdb.models import WorkflowExecution -from fastapi import BackgroundTasks, UploadFile +from fastapi import BackgroundTasks, HTTPException, UploadFile, status from httpx import AsyncClient from mako.runtime import Context from mako.template import Template @@ -22,6 +23,10 @@ else: config_template = Template(filename="mako_templates/config_template.config.template") nextflow_command_template = Template(filename="mako_templates/nextflow_command.template") +# regex to find S3 files in parameters of workflow execution +s3_file_regex = re.compile( + r"s3://(?!(((2(5[0-5]|[0-4]\d)|[01]?\d{1,2})\.){3}(2(5[0-5]|[0-4]\d)|[01]?\d{1,2})$))[a-z\d][a-z\d.-]{1,61}[a-z\d][^\"]*" +) def upload_icon(s3: S3ServiceResource, background_tasks: BackgroundTasks, icon: UploadFile) -> str: @@ -93,36 +98,6 @@ class BinaryWriterContext(Context): return self.write -async def check_bucket_access(db: AsyncSession, uid: str, bucket_path: str) -> str | None: - """ - Check if the bucket exists and the user has READWRITE access to it. - - Parameters - ---------- - db : sqlalchemy.ext.asyncio.AsyncSession. - Async database session to perform query on. - uid : str - UID of a user. - bucket_path : str - Bucket plus optional file path, e.g. 'testbucket' or 'testbucket/folder/test.txt' - - Returns - ------- - error : str | None - If the check was successful, nothing will be returned, otherwise the error will be returned. - """ - bucket = bucket_path.split("/")[0] - file = "/".join(bucket_path.split("/")[1:]) - if not await CRUDBucket.check_bucket_exist(db, bucket): - return f"Bucket {bucket} does not exist." - if not await CRUDBucket.check_access(db, bucket, uid=uid, key=file if len(file) > 0 else None): - error = f"The current user doesn't have READWRITE access to the bucket {bucket}" - if len("file") > 0: - error += f" and file/directory {file}" - return error - return None - - async def start_workflow_execution( s3: S3ServiceResource, db: AsyncSession, @@ -169,3 +144,93 @@ async def start_workflow_execution( ) slurm_job_id = await slurm_client.submit_job(nextflow_script=nextflow_command, execution_id=execution.execution_id) await CRUDWorkflowExecution.update_slurm_job_id(db, slurm_job_id=slurm_job_id, execution_id=execution.execution_id) + + +async def check_active_workflow_execution_limit(db: AsyncSession, uid: str) -> None: + """ + Check the number of active workflow executions of a usr and raise an HTTP exception if a new one would violate the + limit of active workflow executions. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + uid : str + ID of a user. + """ + active_executions = await CRUDWorkflowExecution.list( + db, + uid=uid, + status_list=WorkflowExecution.WorkflowExecutionStatus.active_workflows(), + ) + if settings != -1 and len(active_executions) + 1 > settings.ACTIVE_WORKFLOW_EXECUTION_LIMIT: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=f"The active workflow execution limit per user is {settings.ACTIVE_WORKFLOW_EXECUTION_LIMIT}", + ) + + +async def check_buckets_access( + db: AsyncSession, parameters: dict[str, Any], uid: str, report_bucket: str | None = None +) -> None: + """ + Check if the user has access to the buckets referenced in the workflow execution parameters. + Raises an HTTP Exception if there is an issue. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + parameters : dict[str, Any] + Parameters of the workflow. + uid : str + UID of a user. + report_bucket : str | None, default None + Bucket or Path in bucket where the report should be written. + """ + errors = [] + parameters_string = json.dumps(parameters) + for match in s3_file_regex.finditer(parameters_string): + path = match.group()[5:] + error = await _check_bucket_access(db, uid, path) + if error is not None: + errors.append(error) + # Check if the user has access to the bucket where the report should be written + if report_bucket is not None: + error = await _check_bucket_access(db, uid, report_bucket) + if error is not None: + errors.append(error) + if len(errors) > 0: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail=f"Errors with bucket access: {','.join(errors)}" + ) + + +async def _check_bucket_access(db: AsyncSession, uid: str, bucket_path: str) -> str | None: + """ + Check if the bucket exists and the user has READWRITE access to it. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + uid : str + UID of a user. + bucket_path : str + Bucket plus optional file path, e.g. 'testbucket' or 'testbucket/folder/test.txt' + + Returns + ------- + error : str | None + If the check was successful, nothing will be returned, otherwise the error will be returned. + """ + bucket = bucket_path.split("/")[0] + file = "/".join(bucket_path.split("/")[1:]) + if not await CRUDBucket.check_bucket_exist(db, bucket): + return f"Bucket {bucket} does not exist." + if not await CRUDBucket.check_access(db, bucket, uid=uid, key=file if len(file) > 0 else None): + error = f"The current user doesn't have READWRITE access to the bucket {bucket}" + if len("file") > 0: + error += f" and file/directory {file}" + return error + return None diff --git a/app/core/config.py b/app/core/config.py index 3d842bb..fa8794f 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -89,6 +89,7 @@ class Settings(BaseSettings): "/mnt/config-bucket", description="Path on the slurm cluster where the config bucket is mounted." ) ACTIVE_WORKFLOW_EXECUTION_LIMIT: int = Field(3, description="The limit of active workflow executions per user.") + DEV_SYSTEM: bool = Field(False, description="Open a endpoint where to execute arbitrary workflows.") class Config: case_sensitive = True diff --git a/app/crud/crud_workflow_execution.py b/app/crud/crud_workflow_execution.py index 873eba6..56b5cb9 100644 --- a/app/crud/crud_workflow_execution.py +++ b/app/crud/crud_workflow_execution.py @@ -4,12 +4,17 @@ from clowmdb.models import WorkflowExecution from sqlalchemy import delete, or_, select, update from sqlalchemy.ext.asyncio import AsyncSession -from app.schemas.workflow_execution import WorkflowExecutionIn +from app.schemas.workflow_execution import DevWorkflowExecutionIn, WorkflowExecutionIn class CRUDWorkflowExecution: @staticmethod - async def create(db: AsyncSession, execution: WorkflowExecutionIn, owner_id: str) -> WorkflowExecution: + async def create( + db: AsyncSession, + execution: WorkflowExecutionIn | DevWorkflowExecutionIn, + owner_id: str, + notes: str | None = None, + ) -> WorkflowExecution: """ Create a workflow execution in the database. @@ -17,19 +22,29 @@ class CRUDWorkflowExecution: ---------- db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. - execution : app.schemas.workflow_execution.WorkflowExecutionIn + execution : app.schemas.workflow_execution.WorkflowExecutionIn | DevWorkflowExecutionIn Workflow execution input parameters. owner_id : str User who started the workflow execution. + notes : str | None, default None + Notes to add to the workflow execution. Only usd if 'execution' has type 'DevWorkflowExecutionIn'. Returns ------- workflow_execution : clowmdb.models.WorkflowExecution The newly created workflow execution """ - workflow_execution = WorkflowExecution( - user_id=owner_id, workflow_version_id=execution.workflow_version_id, notes=execution.notes, slurm_job_id=-1 - ) + if isinstance(execution, WorkflowExecutionIn): + workflow_execution = WorkflowExecution( + user_id=owner_id, + workflow_version_id=execution.workflow_version_id, + notes=execution.notes, + slurm_job_id=-1, + ) + else: + workflow_execution = WorkflowExecution( + user_id=owner_id, workflow_version_id=None, notes=notes, slurm_job_id=-1 + ) db.add(workflow_execution) await db.commit() await db.refresh(workflow_execution) diff --git a/app/git_repository/abstract_repository.py b/app/git_repository/abstract_repository.py index f6f22e7..b853f49 100644 --- a/app/git_repository/abstract_repository.py +++ b/app/git_repository/abstract_repository.py @@ -123,15 +123,30 @@ class GitRepository(ABC): Parameters ---------- filepath : str - Path of the file to copy + Path of the file to copy. obj : mypy_boto3_s3.service_resource import Object - S3 object to upload file to + S3 object to upload file to. client : httpx.AsyncClient - Async HTTP Client with an open connection + Async HTTP Client with an open connection. """ with SpooledTemporaryFile(max_size=512000) as f: # temporary file with 500kB data spooled in memory - async with client.stream("GET", self.downloadFileURL(filepath)) as response: - async for chunk in response.aiter_bytes(): - f.write(chunk) + await self.download_file(filepath, client=client, file_handle=f) f.seek(0) obj.upload_fileobj(f) + + async def download_file(self, filepath: str, client: AsyncClient, file_handle: SpooledTemporaryFile) -> None: + """ + Download a file from the git repository. + + Parameters + ---------- + filepath : str + Path of the file to copy. + client : httpx.AsyncClient + Async HTTP Client with an open connection. + file_handle : BytesIO + Write the file into this stream in binary mode. + """ + async with client.stream("GET", self.downloadFileURL(filepath)) as response: + async for chunk in response.aiter_bytes(): + file_handle.write(chunk) diff --git a/app/main.py b/app/main.py index adee895..513e550 100644 --- a/app/main.py +++ b/app/main.py @@ -1,4 +1,3 @@ -import uvicorn from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware @@ -46,7 +45,3 @@ app.add_middleware(GZipMiddleware, minimum_size=500) # Include all routes app.include_router(api_router) app.include_router(miscellaneous_router) - - -if __name__ == "__main__": - uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/app/schemas/workflow_execution.py b/app/schemas/workflow_execution.py index a003c6a..92c380e 100644 --- a/app/schemas/workflow_execution.py +++ b/app/schemas/workflow_execution.py @@ -3,7 +3,7 @@ from typing import Any from uuid import UUID from clowmdb.models import WorkflowExecution -from pydantic import BaseModel, Field +from pydantic import AnyHttpUrl, BaseModel, Field class _BaseWorkflowExecution(BaseModel): @@ -15,7 +15,12 @@ class _BaseWorkflowExecution(BaseModel): min_length=40, max_length=40, ) - notes: str | None = Field(None, description="Optional notes for this workflow execution", max_length=2**16) + notes: str | None = Field( + None, + description="Optional notes for this workflow execution", + max_length=2**16, + example="Some workflow execution specific notes", + ) class WorkflowExecutionIn(_BaseWorkflowExecution): @@ -25,6 +30,7 @@ class WorkflowExecutionIn(_BaseWorkflowExecution): description="Bucket where to save the Nextflow report. If None, no report will be generated", min_length=3, max_length=63, + example="some-bucket", ) @@ -47,8 +53,32 @@ class WorkflowExecutionOut(_BaseWorkflowExecution): example=WorkflowExecution.WorkflowExecutionStatus.RUNNING, ) workflow_version_id: str | None = Field( # type: ignore[assignment] - ..., description="Workflow version git commit hash" + ..., description="Workflow version git commit hash", example="ba8bcd9294c2c96aedefa1763a84a18077c50c0f" ) class Config: orm_mode = True + + +class DevWorkflowExecutionIn(BaseModel): + parameters: dict[str, Any] = Field(..., description="Parameters for this workflow") + report_output_bucket: str | None = Field( + None, + description="Bucket where to save the Nextflow report. If None, no report will be generated", + min_length=3, + max_length=63, + example="some-bucket", + ) + git_commit_hash: str = Field( + ..., + description="Hash of the git commit", + example="ba8bcd9294c2c96aedefa1763a84a18077c50c0f", + regex=r"^[0-9a-f]{40}$", + min_length=40, + max_length=40, + ) + repository_url: AnyHttpUrl = Field( + ..., + description="URL to the Git repository belonging to this workflow", + example="https://github.com/example-user/example", + ) diff --git a/app/tests/api/test_workflow_execution.py b/app/tests/api/test_workflow_execution.py index e95e38e..8439fed 100644 --- a/app/tests/api/test_workflow_execution.py +++ b/app/tests/api/test_workflow_execution.py @@ -8,7 +8,7 @@ from sqlalchemy import update from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings -from app.schemas.workflow_execution import WorkflowExecutionIn +from app.schemas.workflow_execution import DevWorkflowExecutionIn, WorkflowExecutionIn from app.tests.mocks.mock_s3_resource import MockS3ServiceResource from app.tests.utils.bucket import add_permission_for_bucket from app.tests.utils.user import UserWithAuthHeader @@ -368,6 +368,70 @@ class TestWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes): assert response.status_code == status.HTTP_400_BAD_REQUEST +class TestDevWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes): + @pytest.mark.asyncio + async def test_start_dev_workflow_execution( + self, + client: AsyncClient, + random_user: UserWithAuthHeader, + mock_s3_service: MockS3ServiceResource, + ) -> None: + """ + Test for starting a workflow execution with an arbitrary git repository. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. pytest fixture. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. pytest fixture. + mock_s3_service : app.tests.mocks.mock_s3_resource.MockS3ServiceResource + Mock S3 Service to manipulate objects. pytest fixture. + """ + execution_in = DevWorkflowExecutionIn( + git_commit_hash=random_hex_string(), repository_url="https://github.com/example/example", parameters={} + ) + response = await client.post( + f"{self.base_path}/arbitrary", headers=random_user.auth_headers, json=execution_in.dict() + ) + assert response.status_code == status.HTTP_201_CREATED + execution_response = response.json() + assert execution_response["user_id"] == random_user.user.uid + assert execution_response["status"] == WorkflowExecution.WorkflowExecutionStatus.PENDING + + assert ( + UUID(hex=execution_response["execution_id"]).hex + ".config" + in mock_s3_service.Bucket(settings.NXF_CONFIG_BUCKET).objects.all_keys() + ) + + @pytest.mark.asyncio + async def test_start_dev_workflow_execution_with_unknown_repository( + self, + client: AsyncClient, + random_user: UserWithAuthHeader, + mock_s3_service: MockS3ServiceResource, + ) -> None: + """ + Test for starting a workflow execution with an unknown git repository. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. pytest fixture. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. pytest fixture. + mock_s3_service : app.tests.mocks.mock_s3_resource.MockS3ServiceResource + Mock S3 Service to manipulate objects. pytest fixture. + """ + execution_in = DevWorkflowExecutionIn( + git_commit_hash=random_hex_string(), repository_url="https://bitbucket.com/example/example", parameters={} + ) + response = await client.post( + f"{self.base_path}/arbitrary", headers=random_user.auth_headers, json=execution_in.dict() + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + + class TestWorkflowExecutionRoutesGet(_TestWorkflowExecutionRoutes): @pytest.mark.asyncio async def test_get_workflow_execution( diff --git a/app/tests/crud/test_workflow_execution.py b/app/tests/crud/test_workflow_execution.py index da9259f..65adeed 100644 --- a/app/tests/crud/test_workflow_execution.py +++ b/app/tests/crud/test_workflow_execution.py @@ -6,7 +6,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.crud import CRUDWorkflowExecution -from app.schemas.workflow_execution import WorkflowExecutionIn +from app.schemas.workflow_execution import DevWorkflowExecutionIn, WorkflowExecutionIn from app.tests.utils.user import UserWithAuthHeader from app.tests.utils.utils import random_hex_string, random_lower_string @@ -50,6 +50,39 @@ class TestWorkflowExecutionCRUDCreate: assert workflow_execution_db.workflow_version_id == random_workflow_version.git_commit_hash assert workflow_execution_db.status == WorkflowExecution.WorkflowExecutionStatus.PENDING + @pytest.mark.asyncio + async def test_create_dev_workflow_execution(self, db: AsyncSession, random_user: UserWithAuthHeader) -> None: + """ + Test for creating a dev workflow execution with the CRUD Repository. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. pytest fixture. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. pytest fixture. + """ + workflow_execution = DevWorkflowExecutionIn( + git_commit_hash=random_hex_string(), + repository_url="https://example.com", + parameters={}, + ) + workflow_execution_db = await CRUDWorkflowExecution.create(db, workflow_execution, random_user.user.uid) + assert workflow_execution_db + assert workflow_execution_db.user_id == random_user.user.uid + assert workflow_execution_db.status == WorkflowExecution.WorkflowExecutionStatus.PENDING + + workflow_execution_db = ( + await db.execute( + select(WorkflowExecution).where( + WorkflowExecution._execution_id == workflow_execution_db.execution_id.bytes + ) + ) + ).scalar() + assert workflow_execution_db + assert workflow_execution_db.user_id == random_user.user.uid + assert workflow_execution_db.status == WorkflowExecution.WorkflowExecutionStatus.PENDING + class TestWorkflowExecutionCRUDGet: @pytest.mark.asyncio diff --git a/requirements-dev.txt b/requirements-dev.txt index dbf9bb1..aa771f8 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -13,5 +13,5 @@ boto3-stubs-lite[s3]>=1.26.0,<1.27.0 sqlalchemy2-stubs types-requests # Miscellaneous -pre-commit>=3.0.0,<3.1.0 +pre-commit>=3.1.0,<3.2.0 python-dotenv diff --git a/scripts/test.sh b/scripts/test.sh deleted file mode 100755 index 19f94b5..0000000 --- a/scripts/test.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env bash - -set -e -set -x - -alembic downgrade base -alembic upgrade head - -pytest --cov=app --cov-report=term-missing app/tests "${@}" -- GitLab