Skip to content
Snippets Groups Projects
Verified Commit 94e8b32f authored by Daniel Göbel's avatar Daniel Göbel
Browse files

Add endpoint to start workflow execution form arbitrary git repository

#13
parent f6ef5b55
No related branches found
No related tags found
2 merge requests!69Delete dev branch,!12Resolve "Add endpoint to start workflow execution from arbitrary git repository"
Pipeline #26374 passed
......@@ -13,6 +13,7 @@ variables:
SLURM_TOKEN: "empty"
SLURM_ENDPOINT: "http://127.0.0.1:8002"
ACTIVE_WORKFLOW_EXECUTION_LIMIT: 3
DEV_SYSTEM: "True"
cache:
paths:
......@@ -27,7 +28,7 @@ default:
- pip install virtualenv
- virtualenv venv
- source venv/bin/activate
- python -m pip install --upgrade -r requirements.txt --extra-index-url https://$CI_PACKAGE_REGISTRY_USER:$CI_PACKAGE_REGISTRY_PASSWORD@gitlab.ub.uni-bielefeld.de/api/v4/projects/5493/packages/pypi/simple
- python -m pip install --upgrade -r requirements.txt
- python -m pip install --upgrade -r requirements-dev.txt
stages: # List of stages for jobs, and their order of execution
......
......@@ -21,11 +21,11 @@ repos:
files: app
args: [--check]
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: 'v0.0.247'
rev: 'v0.0.252'
hooks:
- id: ruff
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.0.0
rev: v1.0.1
hooks:
- id: mypy
files: app
......
......@@ -25,15 +25,16 @@ This is the Workflow service of the CloWM service.
### Optional Variables
| Variable | Default | Value | Description |
|------------------------------------|------------------------------|-----------------------------|-------------------------------------------------------------------------------------------|
| `API_PREFIX` | `/api/workflow-service` | URL path | Prefix before every URL path |
| `BACKEND_CORS_ORIGINS` | `[]` | json formatted list of urls | List of valid CORS origins |
| `SQLALCHEMY_VERBOSE_LOGGER` | `false` | `<"true"&#x7c;"false">` | Enables verbose SQL output.<br>Should be `false` in production |
| `NXF_CONFIG_BUCKET` | `nxf-config` | Bucket Name | Bucket where the nextflow configurations for each execution should be saved |
| `WORKFLOW_BUCKET` | `clowm-workflows` | Bucket Name | Bucket where to save important workflow files |
| `ICON_BUCKET` | `clowm-icons` | Bucket name | Bucket where to save workflow icons. Should be publicly available. |
| `SLURM_USER` | `slurm` | string | User on the slurm cluster who should run the job. Should be the user of the `SLURM_TOKEN` |
| `SLURM_SHARED_DIR` | `/vol/spool/clowm-workflows` | Path on slurm cluster | Shared directory on the cluster where the nextflow jobs will be executed. |
| `CONFIG_BUCKET_MOUNT_PATH` | `/mnt/config-bucket` | Path on slurm cluster | Folder where the S3 bucket `NFX_CONFIG_BUCKET` will be mounted on the slurm cluster |
| `ACTIVE_WORKFLOW_EXECUTION_LIMIT` | `3` | Integer | Limit of active workflow execution a user is allowed to have. `-1` means infinite. |
| Variable | Default | Value | Description |
|-----------------------------------|------------------------------|-----------------------------|----------------------------------------------------------------------------------------------------------------------------------|
| `API_PREFIX` | `/api/workflow-service` | URL path | Prefix before every URL path |
| `BACKEND_CORS_ORIGINS` | `[]` | json formatted list of urls | List of valid CORS origins |
| `SQLALCHEMY_VERBOSE_LOGGER` | `False` | `<"True"&#x7c;"False">` | Enables verbose SQL output.<br>Should be `false` in production |
| `NXF_CONFIG_BUCKET` | `nxf-config` | Bucket Name | Bucket where the nextflow configurations for each execution should be saved |
| `WORKFLOW_BUCKET` | `clowm-workflows` | Bucket Name | Bucket where to save important workflow files |
| `ICON_BUCKET` | `clowm-icons` | Bucket name | Bucket where to save workflow icons. Should be publicly available. |
| `SLURM_USER` | `slurm` | string | User on the slurm cluster who should run the job. Should be the user of the `SLURM_TOKEN` |
| `SLURM_SHARED_DIR` | `/vol/spool/clowm-workflows` | Path on slurm cluster | Shared directory on the cluster where the nextflow jobs will be executed. |
| `CONFIG_BUCKET_MOUNT_PATH` | `/mnt/config-bucket` | Path on slurm cluster | Folder where the S3 bucket `NFX_CONFIG_BUCKET` will be mounted on the slurm cluster |
| `ACTIVE_WORKFLOW_EXECUTION_LIMIT` | `3` | Integer | Limit of active workflow execution a user is allowed to have. `-1` means infinite. |
| `DEV_SYSTEM` | `False` | `<"True"&#x7c;"False">` | Activates an endpoint that allows execution of an workflow from an arbitrary Git Repository.<br>HAS TO BE `False` in PRODUCTION! |
import json
import re
from tempfile import SpooledTemporaryFile
from typing import TYPE_CHECKING, Any, Awaitable, Callable
import jsonschema
from clowmdb.models import User, WorkflowExecution, WorkflowVersion
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, status
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.dependencies import (
......@@ -13,14 +13,20 @@ from app.api.dependencies import (
get_current_user,
get_current_workflow_execution,
get_db,
get_httpx_client,
get_s3_resource,
get_slurm_client,
)
from app.api.utils import check_bucket_access, start_workflow_execution
from app.api.utils import (
check_active_workflow_execution_limit,
check_buckets_access,
check_repo,
start_workflow_execution,
)
from app.core.config import settings
from app.crud import CRUDWorkflowExecution, CRUDWorkflowVersion
from app.git_repository import build_repository
from app.schemas.workflow_execution import WorkflowExecutionIn, WorkflowExecutionOut
from app.schemas.workflow_execution import DevWorkflowExecutionIn, WorkflowExecutionIn, WorkflowExecutionOut
from app.slurm.slurm_rest_client import SlurmClient
if TYPE_CHECKING:
......@@ -29,12 +35,7 @@ else:
S3ServiceResource = object
router = APIRouter(prefix="/workflow_executions", tags=["WorkflowExecution"])
workflow_authorization = AuthorizationDependency(resource="workflow_execution_in")
# regex to find S3 files in parameters of workflow execution
s3_file_regex = re.compile(
r"s3://(?!(((2(5[0-5]|[0-4]\d)|[01]?\d{1,2})\.){3}(2(5[0-5]|[0-4]\d)|[01]?\d{1,2})$))[a-z\d][a-z\d.-]{1,61}[a-z\d][^\"]*"
)
workflow_authorization = AuthorizationDependency(resource="workflow_execution")
@router.post("", status_code=status.HTTP_201_CREATED, summary="Start a new workflow execution")
......@@ -91,21 +92,8 @@ async def start_workflow(
)
rbac_operation = "start" if workflow_version.status == WorkflowVersion.Status.PUBLISHED else "start_unpublished"
await authorization(rbac_operation)
# Check number of active workflow executions of the user
active_executions = await CRUDWorkflowExecution.list(
db,
uid=current_user.uid,
status_list=[
WorkflowExecution.WorkflowExecutionStatus.PENDING,
WorkflowExecution.WorkflowExecutionStatus.SCHEDULED,
WorkflowExecution.WorkflowExecutionStatus.RUNNING,
],
)
if settings != -1 and len(active_executions) + 1 > settings.ACTIVE_WORKFLOW_EXECUTION_LIMIT:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"The active workflow execution limit per user is {settings.ACTIVE_WORKFLOW_EXECUTION_LIMIT}",
)
await check_active_workflow_execution_limit(db, current_user.uid)
# Validate schema with saved schema in bucket
with SpooledTemporaryFile(max_size=512000) as f:
s3.Bucket(settings.WORKFLOW_BUCKET).Object(
......@@ -120,23 +108,13 @@ async def start_workflow(
status_code=status.HTTP_400_BAD_REQUEST, detail=f"Nextflow Parameter validation error: {e.message}"
)
# Check if the user has access to the provided S3 buckets
errors = []
parameters_string = json.dumps(workflow_execution_in.parameters)
for match in s3_file_regex.finditer(parameters_string):
path = match.group()[5:]
error = await check_bucket_access(db, current_user.uid, path)
if error is not None:
errors.append(error)
# Check if the user has access to the bucket where the report should be written
if workflow_execution_in.report_output_bucket is not None:
error = await check_bucket_access(db, current_user.uid, workflow_execution_in.report_output_bucket)
if error is not None:
errors.append(error)
if len(errors) > 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=f"Errors with bucket access: {','.join(errors)}"
)
# Create execution in database
await check_buckets_access(
db=db,
parameters=workflow_execution_in.parameters,
uid=current_user.uid,
report_bucket=workflow_execution_in.report_output_bucket,
)
# Create execution in database
execution = await CRUDWorkflowExecution.create(db, execution=workflow_execution_in, owner_id=current_user.uid)
......@@ -156,6 +134,104 @@ async def start_workflow(
return execution
@router.post(
"/arbitrary",
status_code=status.HTTP_201_CREATED,
summary="Start a workflow execution with arbitrary git repository",
include_in_schema=settings.DEV_SYSTEM,
)
async def start_arbitrary_workflow(
background_tasks: BackgroundTasks,
workflow_execution_in: DevWorkflowExecutionIn,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
authorization: Callable[[str], Awaitable[Any]] = Depends(AuthorizationDependency(resource="workflow")),
s3: S3ServiceResource = Depends(get_s3_resource),
slurm_client: SlurmClient = Depends(get_slurm_client),
client: AsyncClient = Depends(get_httpx_client),
) -> WorkflowExecutionOut:
"""
Start a new workflow execution from an arbitrary git repository.\n
Permission "workflow:create" required.
\f
Parameters
----------
background_tasks : fastapi.BackgroundTasks
Entrypoint for new BackgroundTasks. Provided by FastAPI.
workflow_execution_in : app.schemas.workflow_executionWorkflowExecutionIn
Meta-data and parameters for the workflow to start. HTTP Body.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
current_user : clowmdb.models.User
Current user who will be the owner of the newly created bucket. Dependency Injection.
authorization : Callable[[str], Awaitable[Any]]
Async function to ask the auth service for authorization. Dependency Injection.
s3 : boto3_type_annotations.s3.ServiceResource
S3 Service to perform operations on buckets in Ceph. Dependency Injection.
slurm_client : app.slurm.slurm_rest_client.SlurmClient
Slurm Rest Client to communicate with Slurm cluster. Dependency Injection.
client : httpx.AsyncClient
Http client with an open connection. Dependency Injection.
Returns
-------
execution : clowmdb.models.WorkflowExecution
Created workflow execution from the database
"""
if not settings.DEV_SYSTEM: # pragma: no cover
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Not available")
await authorization("create")
await check_active_workflow_execution_limit(db, current_user.uid)
try:
# Build a git repository object based on the repository url
repo = build_repository(workflow_execution_in.repository_url, workflow_execution_in.git_commit_hash)
except NotImplementedError:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Supplied Git Repository is not supported")
await check_repo(repo=repo, client=client)
# Validate schema with saved schema in bucket
with SpooledTemporaryFile(max_size=512000) as f:
await repo.download_file("nextflow.schema", client=client, file_handle=f)
f.seek(0)
nextflow_schema = json.load(f)
try:
jsonschema.validate(workflow_execution_in.parameters, nextflow_schema)
except jsonschema.exceptions.ValidationError as e: # pragma: no cover
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=f"Nextflow Parameter validation error: {e.message}"
)
# Create execution in database
await check_buckets_access(
db=db,
parameters=workflow_execution_in.parameters,
uid=current_user.uid,
report_bucket=workflow_execution_in.report_output_bucket,
)
execution = await CRUDWorkflowExecution.create(
db,
execution=workflow_execution_in,
owner_id=current_user.uid,
notes=f"Repository URL: {workflow_execution_in.repository_url}\nGit Commit Hash{workflow_execution_in.git_commit_hash}", # noqa: E501
)
background_tasks.add_task(
start_workflow_execution,
s3=s3,
db=db,
execution=execution,
report_output_bucket=workflow_execution_in.report_output_bucket,
parameters=workflow_execution_in.parameters,
git_repo=repo,
slurm_client=slurm_client,
)
return execution
@router.get("", status_code=status.HTTP_200_OK, summary="Get all workflow executions")
async def list_workflow_executions(
user_id: str
......
import json
import re
from tempfile import SpooledTemporaryFile
from typing import TYPE_CHECKING, Any, Callable
from uuid import uuid4
from clowmdb.models import WorkflowExecution
from fastapi import BackgroundTasks, UploadFile
from fastapi import BackgroundTasks, HTTPException, UploadFile, status
from httpx import AsyncClient
from mako.runtime import Context
from mako.template import Template
......@@ -22,6 +23,10 @@ else:
config_template = Template(filename="mako_templates/config_template.config.template")
nextflow_command_template = Template(filename="mako_templates/nextflow_command.template")
# regex to find S3 files in parameters of workflow execution
s3_file_regex = re.compile(
r"s3://(?!(((2(5[0-5]|[0-4]\d)|[01]?\d{1,2})\.){3}(2(5[0-5]|[0-4]\d)|[01]?\d{1,2})$))[a-z\d][a-z\d.-]{1,61}[a-z\d][^\"]*"
)
def upload_icon(s3: S3ServiceResource, background_tasks: BackgroundTasks, icon: UploadFile) -> str:
......@@ -93,36 +98,6 @@ class BinaryWriterContext(Context):
return self.write
async def check_bucket_access(db: AsyncSession, uid: str, bucket_path: str) -> str | None:
"""
Check if the bucket exists and the user has READWRITE access to it.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
uid : str
UID of a user.
bucket_path : str
Bucket plus optional file path, e.g. 'testbucket' or 'testbucket/folder/test.txt'
Returns
-------
error : str | None
If the check was successful, nothing will be returned, otherwise the error will be returned.
"""
bucket = bucket_path.split("/")[0]
file = "/".join(bucket_path.split("/")[1:])
if not await CRUDBucket.check_bucket_exist(db, bucket):
return f"Bucket {bucket} does not exist."
if not await CRUDBucket.check_access(db, bucket, uid=uid, key=file if len(file) > 0 else None):
error = f"The current user doesn't have READWRITE access to the bucket {bucket}"
if len("file") > 0:
error += f" and file/directory {file}"
return error
return None
async def start_workflow_execution(
s3: S3ServiceResource,
db: AsyncSession,
......@@ -169,3 +144,93 @@ async def start_workflow_execution(
)
slurm_job_id = await slurm_client.submit_job(nextflow_script=nextflow_command, execution_id=execution.execution_id)
await CRUDWorkflowExecution.update_slurm_job_id(db, slurm_job_id=slurm_job_id, execution_id=execution.execution_id)
async def check_active_workflow_execution_limit(db: AsyncSession, uid: str) -> None:
"""
Check the number of active workflow executions of a usr and raise an HTTP exception if a new one would violate the
limit of active workflow executions.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
uid : str
ID of a user.
"""
active_executions = await CRUDWorkflowExecution.list(
db,
uid=uid,
status_list=WorkflowExecution.WorkflowExecutionStatus.active_workflows(),
)
if settings != -1 and len(active_executions) + 1 > settings.ACTIVE_WORKFLOW_EXECUTION_LIMIT:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"The active workflow execution limit per user is {settings.ACTIVE_WORKFLOW_EXECUTION_LIMIT}",
)
async def check_buckets_access(
db: AsyncSession, parameters: dict[str, Any], uid: str, report_bucket: str | None = None
) -> None:
"""
Check if the user has access to the buckets referenced in the workflow execution parameters.
Raises an HTTP Exception if there is an issue.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
parameters : dict[str, Any]
Parameters of the workflow.
uid : str
UID of a user.
report_bucket : str | None, default None
Bucket or Path in bucket where the report should be written.
"""
errors = []
parameters_string = json.dumps(parameters)
for match in s3_file_regex.finditer(parameters_string):
path = match.group()[5:]
error = await _check_bucket_access(db, uid, path)
if error is not None:
errors.append(error)
# Check if the user has access to the bucket where the report should be written
if report_bucket is not None:
error = await _check_bucket_access(db, uid, report_bucket)
if error is not None:
errors.append(error)
if len(errors) > 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=f"Errors with bucket access: {','.join(errors)}"
)
async def _check_bucket_access(db: AsyncSession, uid: str, bucket_path: str) -> str | None:
"""
Check if the bucket exists and the user has READWRITE access to it.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
uid : str
UID of a user.
bucket_path : str
Bucket plus optional file path, e.g. 'testbucket' or 'testbucket/folder/test.txt'
Returns
-------
error : str | None
If the check was successful, nothing will be returned, otherwise the error will be returned.
"""
bucket = bucket_path.split("/")[0]
file = "/".join(bucket_path.split("/")[1:])
if not await CRUDBucket.check_bucket_exist(db, bucket):
return f"Bucket {bucket} does not exist."
if not await CRUDBucket.check_access(db, bucket, uid=uid, key=file if len(file) > 0 else None):
error = f"The current user doesn't have READWRITE access to the bucket {bucket}"
if len("file") > 0:
error += f" and file/directory {file}"
return error
return None
......@@ -89,6 +89,7 @@ class Settings(BaseSettings):
"/mnt/config-bucket", description="Path on the slurm cluster where the config bucket is mounted."
)
ACTIVE_WORKFLOW_EXECUTION_LIMIT: int = Field(3, description="The limit of active workflow executions per user.")
DEV_SYSTEM: bool = Field(False, description="Open a endpoint where to execute arbitrary workflows.")
class Config:
case_sensitive = True
......
......@@ -4,12 +4,17 @@ from clowmdb.models import WorkflowExecution
from sqlalchemy import delete, or_, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.schemas.workflow_execution import WorkflowExecutionIn
from app.schemas.workflow_execution import DevWorkflowExecutionIn, WorkflowExecutionIn
class CRUDWorkflowExecution:
@staticmethod
async def create(db: AsyncSession, execution: WorkflowExecutionIn, owner_id: str) -> WorkflowExecution:
async def create(
db: AsyncSession,
execution: WorkflowExecutionIn | DevWorkflowExecutionIn,
owner_id: str,
notes: str | None = None,
) -> WorkflowExecution:
"""
Create a workflow execution in the database.
......@@ -17,19 +22,29 @@ class CRUDWorkflowExecution:
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
execution : app.schemas.workflow_execution.WorkflowExecutionIn
execution : app.schemas.workflow_execution.WorkflowExecutionIn | DevWorkflowExecutionIn
Workflow execution input parameters.
owner_id : str
User who started the workflow execution.
notes : str | None, default None
Notes to add to the workflow execution. Only usd if 'execution' has type 'DevWorkflowExecutionIn'.
Returns
-------
workflow_execution : clowmdb.models.WorkflowExecution
The newly created workflow execution
"""
workflow_execution = WorkflowExecution(
user_id=owner_id, workflow_version_id=execution.workflow_version_id, notes=execution.notes, slurm_job_id=-1
)
if isinstance(execution, WorkflowExecutionIn):
workflow_execution = WorkflowExecution(
user_id=owner_id,
workflow_version_id=execution.workflow_version_id,
notes=execution.notes,
slurm_job_id=-1,
)
else:
workflow_execution = WorkflowExecution(
user_id=owner_id, workflow_version_id=None, notes=notes, slurm_job_id=-1
)
db.add(workflow_execution)
await db.commit()
await db.refresh(workflow_execution)
......
......@@ -123,15 +123,30 @@ class GitRepository(ABC):
Parameters
----------
filepath : str
Path of the file to copy
Path of the file to copy.
obj : mypy_boto3_s3.service_resource import Object
S3 object to upload file to
S3 object to upload file to.
client : httpx.AsyncClient
Async HTTP Client with an open connection
Async HTTP Client with an open connection.
"""
with SpooledTemporaryFile(max_size=512000) as f: # temporary file with 500kB data spooled in memory
async with client.stream("GET", self.downloadFileURL(filepath)) as response:
async for chunk in response.aiter_bytes():
f.write(chunk)
await self.download_file(filepath, client=client, file_handle=f)
f.seek(0)
obj.upload_fileobj(f)
async def download_file(self, filepath: str, client: AsyncClient, file_handle: SpooledTemporaryFile) -> None:
"""
Download a file from the git repository.
Parameters
----------
filepath : str
Path of the file to copy.
client : httpx.AsyncClient
Async HTTP Client with an open connection.
file_handle : BytesIO
Write the file into this stream in binary mode.
"""
async with client.stream("GET", self.downloadFileURL(filepath)) as response:
async for chunk in response.aiter_bytes():
file_handle.write(chunk)
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
......@@ -46,7 +45,3 @@ app.add_middleware(GZipMiddleware, minimum_size=500)
# Include all routes
app.include_router(api_router)
app.include_router(miscellaneous_router)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
......@@ -3,7 +3,7 @@ from typing import Any
from uuid import UUID
from clowmdb.models import WorkflowExecution
from pydantic import BaseModel, Field
from pydantic import AnyHttpUrl, BaseModel, Field
class _BaseWorkflowExecution(BaseModel):
......@@ -15,7 +15,12 @@ class _BaseWorkflowExecution(BaseModel):
min_length=40,
max_length=40,
)
notes: str | None = Field(None, description="Optional notes for this workflow execution", max_length=2**16)
notes: str | None = Field(
None,
description="Optional notes for this workflow execution",
max_length=2**16,
example="Some workflow execution specific notes",
)
class WorkflowExecutionIn(_BaseWorkflowExecution):
......@@ -25,6 +30,7 @@ class WorkflowExecutionIn(_BaseWorkflowExecution):
description="Bucket where to save the Nextflow report. If None, no report will be generated",
min_length=3,
max_length=63,
example="some-bucket",
)
......@@ -47,8 +53,32 @@ class WorkflowExecutionOut(_BaseWorkflowExecution):
example=WorkflowExecution.WorkflowExecutionStatus.RUNNING,
)
workflow_version_id: str | None = Field( # type: ignore[assignment]
..., description="Workflow version git commit hash"
..., description="Workflow version git commit hash", example="ba8bcd9294c2c96aedefa1763a84a18077c50c0f"
)
class Config:
orm_mode = True
class DevWorkflowExecutionIn(BaseModel):
parameters: dict[str, Any] = Field(..., description="Parameters for this workflow")
report_output_bucket: str | None = Field(
None,
description="Bucket where to save the Nextflow report. If None, no report will be generated",
min_length=3,
max_length=63,
example="some-bucket",
)
git_commit_hash: str = Field(
...,
description="Hash of the git commit",
example="ba8bcd9294c2c96aedefa1763a84a18077c50c0f",
regex=r"^[0-9a-f]{40}$",
min_length=40,
max_length=40,
)
repository_url: AnyHttpUrl = Field(
...,
description="URL to the Git repository belonging to this workflow",
example="https://github.com/example-user/example",
)
......@@ -8,7 +8,7 @@ from sqlalchemy import update
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.schemas.workflow_execution import WorkflowExecutionIn
from app.schemas.workflow_execution import DevWorkflowExecutionIn, WorkflowExecutionIn
from app.tests.mocks.mock_s3_resource import MockS3ServiceResource
from app.tests.utils.bucket import add_permission_for_bucket
from app.tests.utils.user import UserWithAuthHeader
......@@ -368,6 +368,70 @@ class TestWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes):
assert response.status_code == status.HTTP_400_BAD_REQUEST
class TestDevWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes):
@pytest.mark.asyncio
async def test_start_dev_workflow_execution(
self,
client: AsyncClient,
random_user: UserWithAuthHeader,
mock_s3_service: MockS3ServiceResource,
) -> None:
"""
Test for starting a workflow execution with an arbitrary git repository.
Parameters
----------
client : httpx.AsyncClient
HTTP Client to perform the request on. pytest fixture.
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing. pytest fixture.
mock_s3_service : app.tests.mocks.mock_s3_resource.MockS3ServiceResource
Mock S3 Service to manipulate objects. pytest fixture.
"""
execution_in = DevWorkflowExecutionIn(
git_commit_hash=random_hex_string(), repository_url="https://github.com/example/example", parameters={}
)
response = await client.post(
f"{self.base_path}/arbitrary", headers=random_user.auth_headers, json=execution_in.dict()
)
assert response.status_code == status.HTTP_201_CREATED
execution_response = response.json()
assert execution_response["user_id"] == random_user.user.uid
assert execution_response["status"] == WorkflowExecution.WorkflowExecutionStatus.PENDING
assert (
UUID(hex=execution_response["execution_id"]).hex + ".config"
in mock_s3_service.Bucket(settings.NXF_CONFIG_BUCKET).objects.all_keys()
)
@pytest.mark.asyncio
async def test_start_dev_workflow_execution_with_unknown_repository(
self,
client: AsyncClient,
random_user: UserWithAuthHeader,
mock_s3_service: MockS3ServiceResource,
) -> None:
"""
Test for starting a workflow execution with an unknown git repository.
Parameters
----------
client : httpx.AsyncClient
HTTP Client to perform the request on. pytest fixture.
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing. pytest fixture.
mock_s3_service : app.tests.mocks.mock_s3_resource.MockS3ServiceResource
Mock S3 Service to manipulate objects. pytest fixture.
"""
execution_in = DevWorkflowExecutionIn(
git_commit_hash=random_hex_string(), repository_url="https://bitbucket.com/example/example", parameters={}
)
response = await client.post(
f"{self.base_path}/arbitrary", headers=random_user.auth_headers, json=execution_in.dict()
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
class TestWorkflowExecutionRoutesGet(_TestWorkflowExecutionRoutes):
@pytest.mark.asyncio
async def test_get_workflow_execution(
......
......@@ -6,7 +6,7 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.crud import CRUDWorkflowExecution
from app.schemas.workflow_execution import WorkflowExecutionIn
from app.schemas.workflow_execution import DevWorkflowExecutionIn, WorkflowExecutionIn
from app.tests.utils.user import UserWithAuthHeader
from app.tests.utils.utils import random_hex_string, random_lower_string
......@@ -50,6 +50,39 @@ class TestWorkflowExecutionCRUDCreate:
assert workflow_execution_db.workflow_version_id == random_workflow_version.git_commit_hash
assert workflow_execution_db.status == WorkflowExecution.WorkflowExecutionStatus.PENDING
@pytest.mark.asyncio
async def test_create_dev_workflow_execution(self, db: AsyncSession, random_user: UserWithAuthHeader) -> None:
"""
Test for creating a dev workflow execution with the CRUD Repository.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. pytest fixture.
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing. pytest fixture.
"""
workflow_execution = DevWorkflowExecutionIn(
git_commit_hash=random_hex_string(),
repository_url="https://example.com",
parameters={},
)
workflow_execution_db = await CRUDWorkflowExecution.create(db, workflow_execution, random_user.user.uid)
assert workflow_execution_db
assert workflow_execution_db.user_id == random_user.user.uid
assert workflow_execution_db.status == WorkflowExecution.WorkflowExecutionStatus.PENDING
workflow_execution_db = (
await db.execute(
select(WorkflowExecution).where(
WorkflowExecution._execution_id == workflow_execution_db.execution_id.bytes
)
)
).scalar()
assert workflow_execution_db
assert workflow_execution_db.user_id == random_user.user.uid
assert workflow_execution_db.status == WorkflowExecution.WorkflowExecutionStatus.PENDING
class TestWorkflowExecutionCRUDGet:
@pytest.mark.asyncio
......
......@@ -13,5 +13,5 @@ boto3-stubs-lite[s3]>=1.26.0,<1.27.0
sqlalchemy2-stubs
types-requests
# Miscellaneous
pre-commit>=3.0.0,<3.1.0
pre-commit>=3.1.0,<3.2.0
python-dotenv
#!/usr/bin/env bash
set -e
set -x
alembic downgrade base
alembic upgrade head
pytest --cov=app --cov-report=term-missing app/tests "${@}"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment