diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 9ddf5f1c61268e767592f55048abc01e6316df72..ec216b815a41812e41ecae3e80067106c96e93ea 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,4 +1,4 @@ -image: python:3.11-slim +image: ${CI_DEPENDENCY_PROXY_DIRECT_GROUP_IMAGE_PREFIX}/python:3.11-slim variables: PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip" @@ -28,8 +28,7 @@ default: - pip install virtualenv - virtualenv venv - source venv/bin/activate - - python -m pip install --upgrade -r requirements.txt - - python -m pip install --upgrade -r requirements-dev.txt + - python -m pip install --upgrade -r requirements.txt -r requirements-dev.txt stages: # List of stages for jobs, and their order of execution # - build @@ -44,7 +43,7 @@ integration-test-job: # Runs integration tests with the database DB_DATABASE: "integration-test-db" DB_HOST: "integration-test-db" services: - - name: mysql:8 + - name: ${CI_DEPENDENCY_PROXY_DIRECT_GROUP_IMAGE_PREFIX}/mysql:8 alias: integration-test-db variables: MYSQL_RANDOM_ROOT_PASSWORD: "yes" @@ -72,7 +71,7 @@ e2e-test-job: # Runs e2e tests on the API endpoints DB_DATABASE: "e2e-test-db" DB_HOST: "e2e-test-db" services: - - name: mysql:8 + - name: ${CI_DEPENDENCY_PROXY_DIRECT_GROUP_IMAGE_PREFIX}/mysql:8 alias: e2e-test-db variables: MYSQL_RANDOM_ROOT_PASSWORD: "yes" @@ -137,7 +136,7 @@ lint-test-job: # Runs linters checks on code publish-dev-docker-container-job: stage: deploy image: - name: gcr.io/kaniko-project/executor:v1.16.0-debug + name: gcr.io/kaniko-project/executor:v1.17.0-debug entrypoint: [""] dependencies: [] only: @@ -160,7 +159,7 @@ publish-dev-docker-container-job: publish-docker-container-job: stage: deploy image: - name: gcr.io/kaniko-project/executor:v1.16.0-debug + name: gcr.io/kaniko-project/executor:v1.17.0-debug entrypoint: [""] dependencies: [] only: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 74497a0c33bd2467a01c728ed79f3c5353baaf50..baf38857efd4d46e2f5d0b8c7ba24ec9f37d4c18 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -21,7 +21,7 @@ repos: files: app args: [--check] - repo: https://github.com/charliermarsh/ruff-pre-commit - rev: 'v0.1.2' + rev: 'v0.1.3' hooks: - id: ruff - repo: https://github.com/PyCQA/isort diff --git a/app/api/endpoints/workflow.py b/app/api/endpoints/workflow.py index 6204525aefc27f2ab360623ad30e3466403d0c58..52c29e9bdffc8387dd53537718112f869807348e 100644 --- a/app/api/endpoints/workflow.py +++ b/app/api/endpoints/workflow.py @@ -1,3 +1,4 @@ +from datetime import date from typing import Annotated, Any, Awaitable, Callable, List, Optional, Set from uuid import UUID @@ -12,6 +13,7 @@ from app.crud import CRUDWorkflow, CRUDWorkflowVersion from app.crud.crud_workflow_mode import CRUDWorkflowMode from app.git_repository import GitHubRepository, build_repository from app.schemas.workflow import WorkflowIn, WorkflowOut, WorkflowStatistic, WorkflowUpdate +from app.schemas.workflow_execution import AnonymizedWorkflowExecution from app.schemas.workflow_version import WorkflowVersion as WorkflowVersionSchema from app.scm import SCM, Provider from app.utils.otlp import start_as_current_span_async @@ -71,11 +73,11 @@ async def list_workflows( Workflows in the system """ current_span = trace.get_current_span() - if developer_id is not None: + if developer_id is not None: # pragma: no cover current_span.set_attribute("developer_id", developer_id) if name_substring is not None: # pragma: no cover current_span.set_attribute("name_substring", name_substring) - if version_status is not None and len(version_status) > 0: + if version_status is not None and len(version_status) > 0: # pragma: no cover current_span.set_attribute("version_status", [stat.name for stat in version_status]) rbac_operation = "list" if developer_id is not None and current_user.uid != developer_id: @@ -189,6 +191,70 @@ async def create_workflow( return WorkflowOut.from_db_workflow(await CRUDWorkflow.get(db, workflow_db.workflow_id)) +@router.get("/developer_statistics", status_code=status.HTTP_200_OK, summary="Get anonymized workflow execution") +@start_as_current_span_async("api_workflow_get_developer_statistics", tracer=tracer) +async def get_developer_workflow_statistics( + db: DBSession, + authorization: Authorization, + response: Response, + current_user: CurrentUser, + developer_id: Optional[str] = Query( + None, + description="Filter by the developer of the workflows", + examples=["28c5353b8bb34984a8bd4169ba94c606"], + min_length=3, + max_length=64, + ), + workflow_ids: Optional[List[UUID]] = Query(None, description="Filter by workflow IDs", alias="workflow_id"), + start: Optional[date] = Query(None, description="Filter by workflow executions after this date"), + end: Optional[date] = Query(None, description="Filter by workflow executions before this date"), +) -> List[AnonymizedWorkflowExecution]: + """ + Get the workflow executions with meta information and anonymized user IDs.\n + Permission "workflow:read_statistics" required if the `developer_id` is the same as the uid of the current user, + other "workflow:read_statistics_any". + \f + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. Dependency Injection. + authorization : Callable[[str], Awaitable[Any]] + Async function to ask the auth service for authorization. Dependency Injection. + response : fastapi.Response + Temporary Response object. Dependency Injection. + current_user : clowmdb.models.User + Current user. Dependency Injection. + developer_id : str | None, default None + Filter for workflows developed by a specific user. Query Parameter. + workflow_ids: List[uuid.UUID] | None, default None + Filter by workflow IDs. Query Parameter. + start : datetime.date | None, default None + Filter by executions that started after the specified date. Query Parameter. + end : datetime.date | None, default None + Filter by executions that started before the specified date. Query Parameter. + + Returns + ------- + statistics : List[app.schema.workflow.AnonymizedWorkflowExecution] + List of raw datapoints for analysis. + """ + span = trace.get_current_span() + if developer_id: # pragma: no cover + span.set_attribute("developer_id", developer_id) + if workflow_ids: # pragma: no cover + span.set_attribute("workflow_ids", [str(wid) for wid in workflow_ids]) + if start: # pragma: no cover + span.set_attribute("start_day", start.isoformat()) + if end: # pragma: no cover + span.set_attribute("end_day", end.isoformat()) + await authorization("read_statistics" if current_user.uid == developer_id else "read_statistics_any") + # Instruct client to cache response for 1 hour + response.headers["Cache-Control"] = "max-age=3600" + return await CRUDWorkflow.developer_statistics( + db, developer_id=developer_id, workflow_ids=workflow_ids, start=start, end=end + ) + + @router.get("/{wid}", status_code=status.HTTP_200_OK, summary="Get a workflow") @start_as_current_span_async("api_workflow_get", tracer=tracer) async def get_workflow( @@ -203,7 +269,7 @@ async def get_workflow( ) -> WorkflowOut: """ Get a specific workflow.\n - Permission "workflow: read" required. + Permission "workflow:read" required. \f Parameters ---------- @@ -225,7 +291,7 @@ async def get_workflow( """ current_span = trace.get_current_span() current_span.set_attribute("workflow_id", str(workflow.workflow_id)) - if version_status is not None and len(version_status) > 0: + if version_status is not None and len(version_status) > 0: # pragma: no cover current_span.set_attribute("version_status", [stat.name for stat in version_status]) rbac_operation = "read_any" if workflow.developer_id != current_user.uid and version_status is not None else "read" await authorization(rbac_operation) @@ -244,7 +310,8 @@ async def get_workflow_statistics( workflow: CurrentWorkflow, db: DBSession, authorization: Authorization, response: Response ) -> List[WorkflowStatistic]: """ - Get the number of started workflow per day. + Get the number of started workflow per day.\n + Permission "workflow:read" required. \f Parameters ---------- @@ -255,16 +322,17 @@ async def get_workflow_statistics( authorization : Callable[[str], Awaitable[Any]] Async function to ask the auth service for authorization. Dependency Injection. response : fastapi.Response - Temporal Response object. Dependency Injection. + Temporary Response object. Dependency Injection. Returns ------- - statistics : List[app.schema.Workflow.WorkflowStatistic] + statistics : List[app.schema.workflow.WorkflowStatistic] + List of datapoints aggregated by day. """ trace.get_current_span().set_attribute("workflow_id", str(workflow.workflow_id)) await authorization("read") - # Instruct client to cache response for 1 hour - response.headers["Cache-Control"] = "max-age=3600" + # Instruct client to cache response for 24 hour + response.headers["Cache-Control"] = "max-age=86400" return await CRUDWorkflow.statistics(db, workflow.workflow_id) diff --git a/app/crud/crud_workflow.py b/app/crud/crud_workflow.py index 3f5b7876526cfd1af666b9a9d6a21a7a8ae23ada..9b7a9f9339620adf0b2bd263fe357058fab53981 100644 --- a/app/crud/crud_workflow.py +++ b/app/crud/crud_workflow.py @@ -1,4 +1,7 @@ -from typing import List, Optional, Union +from datetime import date, datetime +from hashlib import sha256 +from os import urandom +from typing import Dict, List, Optional, Union from uuid import UUID from clowmdb.models import Workflow, WorkflowExecution, WorkflowVersion @@ -10,6 +13,7 @@ from sqlalchemy.orm import joinedload from app.crud.crud_workflow_mode import CRUDWorkflowMode from app.crud.crud_workflow_version import CRUDWorkflowVersion from app.schemas.workflow import WorkflowIn, WorkflowStatistic +from app.schemas.workflow_execution import AnonymizedWorkflowExecution tracer = trace.get_tracer_provider().get_tracer(__name__) @@ -101,6 +105,92 @@ class CRUDWorkflow: await db.execute(stmt) await db.commit() + @staticmethod + async def developer_statistics( + db: AsyncSession, + developer_id: Optional[str] = None, + workflow_ids: Optional[List[UUID]] = None, + start: Optional[date] = None, + end: Optional[date] = None, + ) -> List[AnonymizedWorkflowExecution]: + """ + Get all workflow executions for a set of workflows with anonymized user ID. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + developer_id : str | None, default None + Filter workflow by developer ID. + workflow_ids : uuid.UUID | None, default None + Filter workflows by ID. + start : datetime.date | None, default None + Filter workflow execution that started after this date. + end : datetime.date | None, default None + Filter workflow execution that started before this date + + Returns + ------- + stats : List[app.schemas.Workflow.AnonymizedWorkflowExecution] + List of datapoints + """ + with tracer.start_as_current_span("db_get_workflow_developer_statistics") as span: + stmt = ( + select( + cast(func.FROM_UNIXTIME(WorkflowExecution.start_time), Date).label("started_at"), + WorkflowExecution._execution_id, + WorkflowExecution.user_id, + WorkflowExecution._workflow_mode_id, + WorkflowVersion.git_commit_hash, + Workflow._workflow_id, + Workflow.developer_id, + WorkflowExecution.status, + ) + .select_from(WorkflowExecution) + .join(WorkflowVersion) + .join(Workflow) + .where(WorkflowExecution.end_time != None) # noqa:E711 + ) + if developer_id: + span.set_attribute("developer_id", developer_id) + stmt = stmt.where(Workflow.developer_id == developer_id) + if workflow_ids: + span.set_attribute("workflow_ids", [str(wid) for wid in workflow_ids]) + stmt = stmt.where(*[Workflow._workflow_id == wid.bytes for wid in workflow_ids]) + if start: + span.set_attribute("start_date", start.isoformat()) + timestamp = round(datetime(year=start.year, month=start.month, day=start.day).timestamp()) + stmt = stmt.where(WorkflowExecution.start_time > timestamp) + if end: + span.set_attribute("end_date", end.isoformat()) + timestamp = round(datetime(year=end.year, month=end.month, day=end.day).timestamp()) + stmt = stmt.where(WorkflowExecution.start_time < timestamp) + user_hashes: Dict[str, str] = {} + + def hash_user_id(uid: str) -> str: + if uid not in user_hashes.keys(): + hash_obj = sha256(usedforsecurity=True) + hash_obj.update(bytes.fromhex(uid if len(uid) % 2 == 0 else uid + "0")) + hash_obj.update(urandom(32)) + user_hashes[uid] = hash_obj.hexdigest() + return user_hashes[uid] + + span.set_attribute("sql_query", str(stmt)) + rows = await db.execute(stmt) + return [ + AnonymizedWorkflowExecution( + workflow_execution_id=row._execution_id, + pseudo_uid=hash_user_id(row.user_id), + workflow_mode_id=row._workflow_mode_id, + started_at=row.started_at, + workflow_id=row._workflow_id, + developer_id=row.developer_id, + git_commit_hash=row.git_commit_hash, + status=row.status, + ) + for row in rows + ] + @staticmethod async def statistics(db: AsyncSession, workflow_id: Union[bytes, UUID]) -> List[WorkflowStatistic]: """ @@ -115,7 +205,7 @@ class CRUDWorkflow: Returns ------- - stat : List[app.schemas.Workflow.WorkflowStatistic] + stats : List[app.schemas.Workflow.WorkflowStatistic] List of datapoints """ with tracer.start_as_current_span("db_get_workflow_statistics") as span: diff --git a/app/git_repository/abstract_repository.py b/app/git_repository/abstract_repository.py index ec974eae7b98665fa1080ce08ddf53e875bf9830..2ac6a395eede851c3390e59e785400f9c6f332aa 100644 --- a/app/git_repository/abstract_repository.py +++ b/app/git_repository/abstract_repository.py @@ -145,12 +145,13 @@ class GitRepository(ABC): Flags if the files exist. """ with tracer.start_as_current_span("git_check_files_exists") as span: - span.set_attribute("repository", self.url) + span.set_attributes({"repository": self.url, "files": files}) tasks = [asyncio.ensure_future(self.check_file_exists(file, client=client)) for file in files] result = await asyncio.gather(*tasks) if raise_error: missing_files = [f for f, exist in zip(files, result) if not exist] if len(missing_files) > 0: + span.set_attribute("missing_files", missing_files) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"The files {', '.join(missing_files)} are missing in the repo {str(self)}", @@ -175,7 +176,8 @@ class GitRepository(ABC): with SpooledTemporaryFile(max_size=512000) as f: # temporary file with 500kB data spooled in memory await self.download_file(filepath, client=client, file_handle=f) f.seek(0) - obj.upload_fileobj(f) + with tracer.start_as_current_span("s3_upload_file"): + obj.upload_fileobj(f) async def download_file_stream(self, filepath: str, client: AsyncClient) -> AsyncIterator[bytes]: """ diff --git a/app/schemas/workflow.py b/app/schemas/workflow.py index 6936e52a6a6a60e6cd6e0d36860e7849284ad8ba..008ba377c61637fa3693116cc376e5b98e18ef6a 100644 --- a/app/schemas/workflow.py +++ b/app/schemas/workflow.py @@ -64,10 +64,10 @@ class WorkflowIn(_BaseWorkflow): class WorkflowOut(_BaseWorkflow): - workflow_id: UUID = Field(..., description="Id of the workflow", examples=["0cc78936-381b-4bdd-999d-736c40591078"]) + workflow_id: UUID = Field(..., description="ID of the workflow") versions: List[WorkflowVersion] = Field(..., description="Versions of the workflow") developer_id: str = Field( - ..., description="Id of developer of the workflow", examples=["28c5353b8bb34984a8bd4169ba94c606"] + ..., description="ID of developer of the workflow", examples=["28c5353b8bb34984a8bd4169ba94c606"] ) private: bool = Field(default=False, description="Flag if the workflow is hosted in a private git repository") diff --git a/app/schemas/workflow_execution.py b/app/schemas/workflow_execution.py index 80b44a4c6d57ced71bae2a72153c166b676b5fa4..6a0e158fbf90bfad3ad0626a32f4270aa2d1b5aa 100644 --- a/app/schemas/workflow_execution.py +++ b/app/schemas/workflow_execution.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import date, datetime from typing import Any, Dict, Optional from uuid import UUID @@ -38,9 +38,7 @@ class WorkflowExecutionIn(_BaseWorkflowExecution): class WorkflowExecutionOut(_BaseWorkflowExecution): - execution_id: UUID = Field( - ..., description="ID of the workflow execution", examples=["0cc78936-381b-4bdd-999d-736c40591078"] - ) + execution_id: UUID = Field(..., description="ID of the workflow execution") user_id: str = Field( ..., description="UID of user who started the workflow", examples=["28c5353b8bb34984a8bd4169ba94c606"] ) @@ -62,9 +60,7 @@ class WorkflowExecutionOut(_BaseWorkflowExecution): workflow_version_id: Optional[str] = Field( # type: ignore[assignment] None, description="Workflow version git commit hash", examples=["ba8bcd9294c2c96aedefa1763a84a18077c50c0f"] ) - workflow_id: Optional[UUID] = Field( - None, description="Id of the workflow", examples=["0cc78936-381b-4bdd-999d-736c40591078"] - ) + workflow_id: Optional[UUID] = Field(None, description="Id of the workflow") @staticmethod def from_db_model( @@ -118,3 +114,28 @@ class DevWorkflowExecutionIn(BaseModel): @field_serializer("repository_url") def serialize_dt(self, url: AnyHttpUrl, _info: FieldSerializationInfo) -> str: return str(url) + + +class AnonymizedWorkflowExecution(BaseModel): + workflow_execution_id: UUID = Field(..., description="ID of the workflow execution") + pseudo_uid: str = Field( + ..., + description="Anonymized user ID of the user who ran the workflow execution", + examples=["7ed4249857b656e96f456449796e461e6001d3fb2481a44701f70ca437bd53a2"], + ) + workflow_mode_id: Optional[UUID] = Field(None, description="ID of the workflow mode this workflow execution ran in") + git_commit_hash: str = Field( + ..., description="Hash of the git commit", examples=["ba8bcd9294c2c96aedefa1763a84a18077c50c0f"] + ) + started_at: date = Field( + ..., description="Day of the workflow execution", examples=[date(day=1, month=1, year=2023)] + ) + workflow_id: UUID = Field(..., description="ID of the workflow") + developer_id: str = Field( + ..., description="ID of developer of the workflow", examples=["28c5353b8bb34984a8bd4169ba94c606"] + ) + status: WorkflowExecution.WorkflowExecutionStatus = Field( + ..., + description="End status of the workflow execution", + examples=[WorkflowExecution.WorkflowExecutionStatus.SUCCESS], + ) diff --git a/app/tests/api/test_workflow.py b/app/tests/api/test_workflow.py index f1a73a177090bd0abe03a5cd27a1abce6405b4a5..9b11e265535bce13d851b759ccbd474ce5a329de 100644 --- a/app/tests/api/test_workflow.py +++ b/app/tests/api/test_workflow.py @@ -507,6 +507,40 @@ class TestWorkflowRoutesList(_TestWorkflowRoutes): assert len(workflows) == 1 assert workflows[0]["workflow_id"] == str(random_workflow.workflow_id) + @pytest.mark.asyncio + async def test_list_workflow_statistics( + self, + client: AsyncClient, + random_user: UserWithAuthHeader, + random_workflow: WorkflowOut, + random_completed_workflow_execution: WorkflowExecution, + ) -> None: + """ + Test for getting all workflow executions as workflow statistics. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. + random_workflow : app.schemas.workflow.WorkflowOut + Random workflow for testing. + random_completed_workflow_execution : clowmdb.models.WorkflowExecution + Random workflow execution for testing + """ + response = await client.get( + f"{self.base_path}/developer_statistics", + headers=random_user.auth_headers, + ) + assert response.status_code == status.HTTP_200_OK + executions = response.json() + assert len(executions) == 1 + execution = executions[0] + assert execution["workflow_id"] == str(random_workflow.workflow_id) + assert execution["workflow_execution_id"] == str(random_completed_workflow_execution.execution_id) + assert execution["git_commit_hash"] == random_completed_workflow_execution.workflow_version_id + class TestWorkflowRoutesGet(_TestWorkflowRoutes): @pytest.mark.asyncio @@ -560,10 +594,10 @@ class TestWorkflowRoutesGet(_TestWorkflowRoutes): client: AsyncClient, random_user: UserWithAuthHeader, random_workflow: WorkflowOut, - random_workflow_execution: WorkflowExecution, + random_running_workflow_execution: WorkflowExecution, ) -> None: """ - Test for getting a non-existing workflow. + Test for getting the aggregated workflow statistics. Parameters ---------- @@ -571,6 +605,10 @@ class TestWorkflowRoutesGet(_TestWorkflowRoutes): HTTP Client to perform the request on. random_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. + random_workflow : app.schemas.workflow.WorkflowOut + Random workflow for testing. + random_running_workflow_execution : clowmdb.models.WorkflowExecution + Random workflow execution for testing """ response = await client.get( "/".join([self.base_path, str(random_workflow.workflow_id), "statistics"]), diff --git a/app/tests/api/test_workflow_execution.py b/app/tests/api/test_workflow_execution.py index e33c0e91751fed99503d70547a07cf22dc16cd23..5539139cf8581a356653ac9432f89a1c6ce0d0d2 100644 --- a/app/tests/api/test_workflow_execution.py +++ b/app/tests/api/test_workflow_execution.py @@ -1010,7 +1010,7 @@ class TestWorkflowExecutionRoutesGet(_TestWorkflowExecutionRoutes): self, client: AsyncClient, random_user: UserWithAuthHeader, - random_workflow_execution: WorkflowExecution, + random_running_workflow_execution: WorkflowExecution, ) -> None: """ Test for getting a workflow execution. @@ -1021,23 +1021,24 @@ class TestWorkflowExecutionRoutesGet(_TestWorkflowExecutionRoutes): HTTP Client to perform the request on. random_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowmdb.models.WorkflowExecution Random workflow execution for testing. """ response = await client.get( - "/".join([self.base_path, str(random_workflow_execution.execution_id)]), headers=random_user.auth_headers + "/".join([self.base_path, str(random_running_workflow_execution.execution_id)]), + headers=random_user.auth_headers, ) assert response.status_code == status.HTTP_200_OK execution = response.json() - assert execution["execution_id"] == str(random_workflow_execution.execution_id) + assert execution["execution_id"] == str(random_running_workflow_execution.execution_id) @pytest.mark.asyncio async def test_get_workflow_execution_params( self, client: AsyncClient, random_user: UserWithAuthHeader, - random_workflow_execution: WorkflowExecution, + random_running_workflow_execution: WorkflowExecution, ) -> None: """ Test for getting the parameters of a workflow execution. @@ -1048,11 +1049,11 @@ class TestWorkflowExecutionRoutesGet(_TestWorkflowExecutionRoutes): HTTP Client to perform the request on. random_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowmdb.models.WorkflowExecution Random workflow execution for testing. """ response = await client.get( - "/".join([self.base_path, str(random_workflow_execution.execution_id), "params"]), + "/".join([self.base_path, str(random_running_workflow_execution.execution_id), "params"]), headers=random_user.auth_headers, ) assert response.status_code == status.HTTP_200_OK @@ -1086,7 +1087,7 @@ class TestWorkflowExecutionRoutesList(_TestWorkflowExecutionRoutes): self, client: AsyncClient, random_user: UserWithAuthHeader, - random_workflow_execution: WorkflowExecution, + random_running_workflow_execution: WorkflowExecution, ) -> None: """ Test for listing all workflow executions. @@ -1097,7 +1098,7 @@ class TestWorkflowExecutionRoutesList(_TestWorkflowExecutionRoutes): HTTP Client to perform the request on. random_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowmdb.models.WorkflowExecution Random workflow execution for testing. """ response = await client.get(self.base_path, headers=random_user.auth_headers) @@ -1109,7 +1110,7 @@ class TestWorkflowExecutionRoutesList(_TestWorkflowExecutionRoutes): sum( 1 for execution in executions - if execution["execution_id"] == str(random_workflow_execution.execution_id) + if execution["execution_id"] == str(random_running_workflow_execution.execution_id) ) == 1 ) @@ -1121,7 +1122,7 @@ class TestWorkflowExecutionRoutesDelete(_TestWorkflowExecutionRoutes): self, client: AsyncClient, random_user: UserWithAuthHeader, - random_workflow_execution: WorkflowExecution, + random_running_workflow_execution: WorkflowExecution, ) -> None: """ Test for deleting an unfinished workflow execution. @@ -1132,11 +1133,12 @@ class TestWorkflowExecutionRoutesDelete(_TestWorkflowExecutionRoutes): HTTP Client to perform the request on. random_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowmdb.models.WorkflowExecution Random workflow execution for testing. """ response = await client.delete( - "/".join([self.base_path, str(random_workflow_execution.execution_id)]), headers=random_user.auth_headers + "/".join([self.base_path, str(random_running_workflow_execution.execution_id)]), + headers=random_user.auth_headers, ) assert response.status_code == status.HTTP_400_BAD_REQUEST @@ -1146,7 +1148,7 @@ class TestWorkflowExecutionRoutesDelete(_TestWorkflowExecutionRoutes): client: AsyncClient, db: AsyncSession, random_user: UserWithAuthHeader, - random_workflow_execution: WorkflowExecution, + random_running_workflow_execution: WorkflowExecution, ) -> None: """ Test for deleting a workflow execution. @@ -1159,17 +1161,18 @@ class TestWorkflowExecutionRoutesDelete(_TestWorkflowExecutionRoutes): Async database session to perform query on. random_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowmdb.models.WorkflowExecution Random workflow execution for testing. """ await db.execute( update(WorkflowExecution) - .where(WorkflowExecution._execution_id == random_workflow_execution.execution_id.bytes) + .where(WorkflowExecution._execution_id == random_running_workflow_execution.execution_id.bytes) .values(status=WorkflowExecution.WorkflowExecutionStatus.SUCCESS.name) ) await db.commit() response = await client.delete( - "/".join([self.base_path, str(random_workflow_execution.execution_id)]), headers=random_user.auth_headers + "/".join([self.base_path, str(random_running_workflow_execution.execution_id)]), + headers=random_user.auth_headers, ) assert response.status_code == status.HTTP_204_NO_CONTENT @@ -1180,7 +1183,7 @@ class TestWorkflowExecutionRoutesCancel(_TestWorkflowExecutionRoutes): self, client: AsyncClient, random_user: UserWithAuthHeader, - random_workflow_execution: WorkflowExecution, + random_running_workflow_execution: WorkflowExecution, mock_slurm_cluster: MockSlurmCluster, ) -> None: """ @@ -1192,17 +1195,17 @@ class TestWorkflowExecutionRoutesCancel(_TestWorkflowExecutionRoutes): HTTP Client to perform the request on. random_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowmdb.models.WorkflowExecution Random workflow execution for testing. mock_slurm_cluster : app.tests.mocks.mock_slurm_cluster.MockSlurmCluster Mock Slurm cluster to inspect submitted jobs. """ response = await client.post( - "/".join([self.base_path, str(random_workflow_execution.execution_id), "cancel"]), + "/".join([self.base_path, str(random_running_workflow_execution.execution_id), "cancel"]), headers=random_user.auth_headers, ) assert response.status_code == status.HTTP_204_NO_CONTENT - job_active = mock_slurm_cluster.job_active(random_workflow_execution.slurm_job_id) + job_active = mock_slurm_cluster.job_active(random_running_workflow_execution.slurm_job_id) assert not job_active @pytest.mark.asyncio @@ -1211,7 +1214,7 @@ class TestWorkflowExecutionRoutesCancel(_TestWorkflowExecutionRoutes): client: AsyncClient, db: AsyncSession, random_user: UserWithAuthHeader, - random_workflow_execution: WorkflowExecution, + random_running_workflow_execution: WorkflowExecution, ) -> None: """ Test for canceling a finished workflow execution. @@ -1224,17 +1227,17 @@ class TestWorkflowExecutionRoutesCancel(_TestWorkflowExecutionRoutes): Async database session to perform query on. random_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowmdb.models.WorkflowExecution Random workflow execution for testing. """ await db.execute( update(WorkflowExecution) - .where(WorkflowExecution._execution_id == random_workflow_execution.execution_id.bytes) + .where(WorkflowExecution._execution_id == random_running_workflow_execution.execution_id.bytes) .values(status=WorkflowExecution.WorkflowExecutionStatus.SUCCESS.name) ) await db.commit() response = await client.post( - "/".join([self.base_path, str(random_workflow_execution.execution_id), "cancel"]), + "/".join([self.base_path, str(random_running_workflow_execution.execution_id), "cancel"]), headers=random_user.auth_headers, ) assert response.status_code == status.HTTP_400_BAD_REQUEST diff --git a/app/tests/conftest.py b/app/tests/conftest.py index 7e7a2210d3eb0e9152ae976d47d6d4d327819ce7..0b00faa98fc5ec3ca33b6b72491160ca3c4574a4 100644 --- a/app/tests/conftest.py +++ b/app/tests/conftest.py @@ -1,4 +1,5 @@ import asyncio +import time from functools import partial from io import BytesIO from secrets import token_urlsafe @@ -278,7 +279,7 @@ async def random_workflow_version(db: AsyncSession, random_workflow: WorkflowOut @pytest_asyncio.fixture(scope="function") -async def random_workflow_execution( +async def random_running_workflow_execution( db: AsyncSession, random_workflow_version: WorkflowVersion, random_user: UserWithAuthHeader, @@ -286,7 +287,7 @@ async def random_workflow_execution( mock_slurm_cluster: MockSlurmCluster, ) -> AsyncIterator[WorkflowExecution]: """ - Create a random workflow execution. Will be deleted, when the user is deleted. + Create a random running workflow execution. Will be deleted, when the user is deleted. """ execution = WorkflowExecution( user_id=random_user.user.uid, @@ -308,6 +309,23 @@ async def random_workflow_execution( mock_s3_service.Bucket(settings.PARAMS_BUCKET).Object(f"params-{execution.execution_id.hex}.json").delete() +@pytest_asyncio.fixture(scope="function") +async def random_completed_workflow_execution( + db: AsyncSession, random_running_workflow_execution: WorkflowExecution +) -> WorkflowExecution: + """ + Create a random workflow execution which is completed. + """ + + await db.execute( + update(WorkflowExecution) + .where(WorkflowExecution._execution_id == random_running_workflow_execution.execution_id.bytes) + .values(end_time=round(time.time()), status=WorkflowExecution.WorkflowExecutionStatus.SUCCESS) + ) + await db.commit() + return random_running_workflow_execution + + @pytest_asyncio.fixture(scope="function") async def random_workflow_mode( db: AsyncSession, diff --git a/app/tests/crud/test_user.py b/app/tests/crud/test_user.py new file mode 100644 index 0000000000000000000000000000000000000000..7db6677c3e6f660439694f0b03e9b3d02b7f185a --- /dev/null +++ b/app/tests/crud/test_user.py @@ -0,0 +1,37 @@ +import pytest +from sqlalchemy.ext.asyncio import AsyncSession + +from app.crud import CRUDUser +from app.tests.utils.user import UserWithAuthHeader +from app.tests.utils.utils import random_hex_string + + +class TestUserCRUD: + @pytest.mark.asyncio + async def test_get_user(self, db: AsyncSession, random_user: UserWithAuthHeader) -> None: + """ + Test for getting a user from the database + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. + """ + user = await CRUDUser.get(db, random_user.user.uid) + assert user is not None + assert user == random_user.user + + @pytest.mark.asyncio + async def test_get_non_existing_user(self, db: AsyncSession) -> None: + """ + Test for getting a non-existing user from the database + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + """ + user = await CRUDUser.get(db, random_hex_string()) + assert user is None diff --git a/app/tests/crud/test_workflow.py b/app/tests/crud/test_workflow.py index 47b2a07c092cb0cc5d4f48af17d3c2d688cbbdf2..637c2d8002b12564b319c24ab4fac0a91061832c 100644 --- a/app/tests/crud/test_workflow.py +++ b/app/tests/crud/test_workflow.py @@ -1,13 +1,16 @@ import random -from datetime import date +from datetime import date, timedelta +from uuid import uuid4 import pytest from clowmdb.models import Workflow, WorkflowExecution, WorkflowVersion from sqlalchemy import delete, select from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import joinedload from app.crud import CRUDWorkflow from app.schemas.workflow import WorkflowIn, WorkflowOut +from app.schemas.workflow_mode import WorkflowModeIn from app.tests.utils.user import UserWithAuthHeader from app.tests.utils.utils import random_hex_string, random_lower_string @@ -137,10 +140,10 @@ class TestWorkflowCRUDGet: @pytest.mark.asyncio async def test_get_workflow_statistics( - self, db: AsyncSession, random_workflow: WorkflowOut, random_workflow_execution: WorkflowExecution + self, db: AsyncSession, random_workflow: WorkflowOut, random_running_workflow_execution: WorkflowExecution ) -> None: """ - Test for getting a workflow by its name from CRUD Repository. + Test for getting the aggregated workflow statistics. Parameters ---------- @@ -148,12 +151,215 @@ class TestWorkflowCRUDGet: Async database session to perform query on. random_workflow : app.schemas.workflow.WorkflowOut Random bucket for testing. + random_running_workflow_execution : clowmdb.models.WorkflowExecution + Random workflow execution for testing. """ statistics = await CRUDWorkflow.statistics(db, random_workflow.workflow_id) assert len(statistics) == 1 assert statistics[0].day == date.today() assert statistics[0].count == 1 + @pytest.mark.asyncio + async def test_get_workflow_developer_statistics( + self, db: AsyncSession, random_workflow: WorkflowOut, random_completed_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for getting workflow statistics for developer. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + random_workflow : app.schemas.workflow.WorkflowOut + Random bucket for testing. + random_completed_workflow_execution : clowmdb.models.WorkflowExecution + Random workflow execution for testing. + """ + statistics = await CRUDWorkflow.developer_statistics(db) + assert len(statistics) == 1 + assert statistics[0].started_at == date.today() + assert statistics[0].workflow_id == random_workflow.workflow_id + assert statistics[0].workflow_execution_id == random_completed_workflow_execution.execution_id + assert statistics[0].git_commit_hash == random_completed_workflow_execution.workflow_version_id + assert statistics[0].pseudo_uid != random_completed_workflow_execution.user_id + assert statistics[0].developer_id == random_workflow.developer_id + + @pytest.mark.asyncio + async def test_get_workflow_developer_statistics_with_developer_id( + self, db: AsyncSession, random_workflow: WorkflowOut, random_completed_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for getting workflow statistics for developer with a developer ID. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + random_workflow : app.schemas.workflow.WorkflowOut + Random bucket for testing. + random_completed_workflow_execution : clowmdb.models.WorkflowExecution + Random workflow execution for testing. + """ + statistics = await CRUDWorkflow.developer_statistics(db, developer_id=random_workflow.developer_id) + assert len(statistics) == 1 + assert statistics[0].started_at == date.today() + assert statistics[0].workflow_id == random_workflow.workflow_id + assert statistics[0].workflow_execution_id == random_completed_workflow_execution.execution_id + assert statistics[0].git_commit_hash == random_completed_workflow_execution.workflow_version_id + assert statistics[0].pseudo_uid != random_completed_workflow_execution.user_id + assert statistics[0].developer_id == random_workflow.developer_id + + @pytest.mark.asyncio + async def test_get_workflow_developer_statistics_with_non_existent_developer_id( + self, db: AsyncSession, random_workflow: WorkflowOut, random_completed_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for getting workflow statistics for developer with a non-existing developer ID. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + random_workflow : app.schemas.workflow.WorkflowOut + Random bucket for testing. + random_completed_workflow_execution : clowmdb.models.WorkflowExecution + Random workflow execution for testing. + """ + statistics = await CRUDWorkflow.developer_statistics(db, developer_id=random_lower_string(40)) + assert len(statistics) == 0 + + @pytest.mark.asyncio + async def test_get_workflow_developer_statistics_with_workflow_id( + self, db: AsyncSession, random_workflow: WorkflowOut, random_completed_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for getting workflow statistics for developer with a workflow ID. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + random_workflow : app.schemas.workflow.WorkflowOut + Random bucket for testing. + random_completed_workflow_execution : clowmdb.models.WorkflowExecution + Random workflow execution for testing. + """ + statistics = await CRUDWorkflow.developer_statistics(db, workflow_ids=[random_workflow.workflow_id]) + assert len(statistics) == 1 + assert statistics[0].started_at == date.today() + assert statistics[0].workflow_id == random_workflow.workflow_id + assert statistics[0].workflow_execution_id == random_completed_workflow_execution.execution_id + assert statistics[0].git_commit_hash == random_completed_workflow_execution.workflow_version_id + assert statistics[0].pseudo_uid != random_completed_workflow_execution.user_id + assert statistics[0].developer_id == random_workflow.developer_id + + @pytest.mark.asyncio + async def test_get_workflow_developer_statistics_with_non_existent_workflow_id( + self, db: AsyncSession, random_workflow: WorkflowOut, random_completed_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for getting workflow statistics for developer with a non-existing workflow ID. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + random_workflow : app.schemas.workflow.WorkflowOut + Random bucket for testing. + random_completed_workflow_execution : clowmdb.models.WorkflowExecution + Random workflow execution for testing. + """ + statistics = await CRUDWorkflow.developer_statistics(db, workflow_ids=[uuid4()]) + assert len(statistics) == 0 + + @pytest.mark.asyncio + async def test_get_workflow_developer_statistics_with_start_date( + self, db: AsyncSession, random_workflow: WorkflowOut, random_completed_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for getting workflow statistics for developer with a start date in the past. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + random_workflow : app.schemas.workflow.WorkflowOut + Random bucket for testing. + random_completed_workflow_execution : clowmdb.models.WorkflowExecution + Random workflow execution for testing. + """ + statistics = await CRUDWorkflow.developer_statistics(db, start=date.today() - timedelta(days=7)) + assert len(statistics) == 1 + assert statistics[0].started_at == date.today() + assert statistics[0].workflow_id == random_workflow.workflow_id + assert statistics[0].workflow_execution_id == random_completed_workflow_execution.execution_id + assert statistics[0].git_commit_hash == random_completed_workflow_execution.workflow_version_id + assert statistics[0].pseudo_uid != random_completed_workflow_execution.user_id + assert statistics[0].developer_id == random_workflow.developer_id + + @pytest.mark.asyncio + async def test_get_workflow_developer_statistics_with_bad_start_day( + self, db: AsyncSession, random_workflow: WorkflowOut, random_completed_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for getting workflow statistics for developer with a start date in the future. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + random_workflow : app.schemas.workflow.WorkflowOut + Random bucket for testing. + random_completed_workflow_execution : clowmdb.models.WorkflowExecution + Random workflow execution for testing. + """ + statistics = await CRUDWorkflow.developer_statistics(db, start=date.today() + timedelta(days=7)) + assert len(statistics) == 0 + + @pytest.mark.asyncio + async def test_get_workflow_developer_statistics_with_end_date( + self, db: AsyncSession, random_workflow: WorkflowOut, random_completed_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for getting workflow statistics for developer with a end date in the future. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + random_workflow : app.schemas.workflow.WorkflowOut + Random bucket for testing. + random_completed_workflow_execution : clowmdb.models.WorkflowExecution + Random workflow execution for testing. + """ + statistics = await CRUDWorkflow.developer_statistics(db, end=date.today() + timedelta(days=7)) + assert len(statistics) == 1 + assert statistics[0].started_at == date.today() + assert statistics[0].workflow_id == random_workflow.workflow_id + assert statistics[0].workflow_execution_id == random_completed_workflow_execution.execution_id + assert statistics[0].git_commit_hash == random_completed_workflow_execution.workflow_version_id + assert statistics[0].pseudo_uid != random_completed_workflow_execution.user_id + assert statistics[0].developer_id == random_workflow.developer_id + + @pytest.mark.asyncio + async def test_get_workflow_developer_statistics_with_bad_end_day( + self, db: AsyncSession, random_workflow: WorkflowOut, random_completed_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for getting workflow statistics for developer with a end date in the past. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + random_workflow : app.schemas.workflow.WorkflowOut + Random bucket for testing. + random_completed_workflow_execution : clowmdb.models.WorkflowExecution + Random workflow execution for testing. + """ + statistics = await CRUDWorkflow.developer_statistics(db, end=date.today() - timedelta(days=7)) + assert len(statistics) == 0 + class TestWorkflowCRUDCreate: @pytest.mark.asyncio @@ -185,6 +391,47 @@ class TestWorkflowCRUDCreate: await db.execute(delete(Workflow).where(Workflow._workflow_id == workflow.workflow_id.bytes)) await db.commit() + @pytest.mark.asyncio + async def test_create_workflow_with_mode(self, db: AsyncSession, random_user: UserWithAuthHeader) -> None: + """ + Test for creating a workflow with a mode in CRUD Repository. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. + """ + workflow_in = WorkflowIn( + git_commit_hash=random_hex_string(), + name=random_lower_string(10), + short_description=random_lower_string(65), + repository_url="https://github.com/example/example", + modes=[ + WorkflowModeIn( + schema_path="example/schema.json", name=random_lower_string(), entrypoint=random_lower_string() + ) + ], + ) + workflow = await CRUDWorkflow.create(db, workflow=workflow_in, developer=random_user.user.uid) + assert workflow is not None + + stmt = ( + select(Workflow) + .where(Workflow._workflow_id == workflow.workflow_id.bytes) + .options(joinedload(Workflow.versions).selectinload(WorkflowVersion.workflow_modes)) + ) + created_workflow = await db.scalar(stmt) + assert created_workflow is not None + assert created_workflow == workflow + + assert len(created_workflow.versions) == 1 + assert len(created_workflow.versions[0].workflow_modes) == 1 + + await db.execute(delete(Workflow).where(Workflow._workflow_id == workflow.workflow_id.bytes)) + await db.commit() + @pytest.mark.asyncio async def test_create_workflow_credentials(self, db: AsyncSession, random_workflow: WorkflowOut) -> None: """ diff --git a/app/tests/crud/test_workflow_execution.py b/app/tests/crud/test_workflow_execution.py index b0f8013d004dabc8d9fed86863e8cdab41a33b43..815eaf8dea2671e9f335c48a141dd074850976f3 100644 --- a/app/tests/crud/test_workflow_execution.py +++ b/app/tests/crud/test_workflow_execution.py @@ -79,7 +79,9 @@ class TestWorkflowExecutionCRUDCreate: class TestWorkflowExecutionCRUDGet: @pytest.mark.asyncio - async def test_get_workflow_execution(self, db: AsyncSession, random_workflow_execution: WorkflowExecution) -> None: + async def test_get_workflow_execution( + self, db: AsyncSession, random_running_workflow_execution: WorkflowExecution + ) -> None: """ Test for getting a workflow execution by its execution id. @@ -87,12 +89,12 @@ class TestWorkflowExecutionCRUDGet: ---------- db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. - random_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowmdb.models.WorkflowExecution Random workflow execution for testing. """ - execution = await CRUDWorkflowExecution.get(db, random_workflow_execution.execution_id) + execution = await CRUDWorkflowExecution.get(db, random_running_workflow_execution.execution_id) assert execution is not None - assert execution == random_workflow_execution + assert execution == random_running_workflow_execution @pytest.mark.asyncio async def test_get_non_existing_workflow_execution(self, db: AsyncSession) -> None: @@ -111,7 +113,7 @@ class TestWorkflowExecutionCRUDGet: class TestWorkflowExecutionCRUDList: @pytest.mark.asyncio async def test_list_workflow_executions( - self, db: AsyncSession, random_workflow_execution: WorkflowExecution + self, db: AsyncSession, random_running_workflow_execution: WorkflowExecution ) -> None: """ Test for listing all workflow executions. @@ -120,16 +122,16 @@ class TestWorkflowExecutionCRUDList: ---------- db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. - random_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowmdb.models.WorkflowExecution Random workflow execution for testing. """ executions = await CRUDWorkflowExecution.list(db) assert len(executions) > 0 - assert sum(1 for execution in executions if execution == random_workflow_execution) == 1 + assert sum(1 for execution in executions if execution == random_running_workflow_execution) == 1 @pytest.mark.asyncio async def test_get_list_workflow_executions_of_user( - self, db: AsyncSession, random_workflow_execution: WorkflowExecution + self, db: AsyncSession, random_running_workflow_execution: WorkflowExecution ) -> None: """ Test for listing all workflow executions and filter by user. @@ -138,17 +140,17 @@ class TestWorkflowExecutionCRUDList: ---------- db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. - random_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowmdb.models.WorkflowExecution Random workflow execution for testing. """ - executions = await CRUDWorkflowExecution.list(db, uid=random_workflow_execution.user_id) + executions = await CRUDWorkflowExecution.list(db, uid=random_running_workflow_execution.user_id) assert len(executions) > 0 - assert sum(1 for execution in executions if execution == random_workflow_execution) == 1 - assert sum(1 for execution in executions if execution.user_id == random_workflow_execution.user_id) >= 1 + assert sum(1 for execution in executions if execution == random_running_workflow_execution) == 1 + assert sum(1 for execution in executions if execution.user_id == random_running_workflow_execution.user_id) >= 1 @pytest.mark.asyncio async def test_get_list_workflow_executions_of_non_existing_user( - self, db: AsyncSession, random_workflow_execution: WorkflowExecution + self, db: AsyncSession, random_running_workflow_execution: WorkflowExecution ) -> None: """ Test for listing all workflow executions and filter by non-existing user. @@ -157,16 +159,16 @@ class TestWorkflowExecutionCRUDList: ---------- db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. - random_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowmdb.models.WorkflowExecution Random workflow execution for testing. """ executions = await CRUDWorkflowExecution.list(db, uid=random_lower_string()) assert len(executions) == 0 - assert sum(1 for execution in executions if execution == random_workflow_execution) == 0 + assert sum(1 for execution in executions if execution == random_running_workflow_execution) == 0 @pytest.mark.asyncio async def test_get_list_workflow_executions_of_workflow_version( - self, db: AsyncSession, random_workflow_execution: WorkflowExecution + self, db: AsyncSession, random_running_workflow_execution: WorkflowExecution ) -> None: """ Test for listing all workflow executions and filter by workflow version id. @@ -175,26 +177,26 @@ class TestWorkflowExecutionCRUDList: ---------- db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. - random_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowmdb.models.WorkflowExecution Random workflow execution for testing. """ executions = await CRUDWorkflowExecution.list( - db, workflow_version_id=random_workflow_execution.workflow_version_id + db, workflow_version_id=random_running_workflow_execution.workflow_version_id ) assert len(executions) > 0 - assert sum(1 for execution in executions if execution == random_workflow_execution) == 1 + assert sum(1 for execution in executions if execution == random_running_workflow_execution) == 1 assert ( sum( 1 for execution in executions - if execution.workflow_version_id == random_workflow_execution.workflow_version_id + if execution.workflow_version_id == random_running_workflow_execution.workflow_version_id ) >= 1 ) @pytest.mark.asyncio async def test_get_list_workflow_executions_of_non_existing_workflow_version( - self, db: AsyncSession, random_workflow_execution: WorkflowExecution + self, db: AsyncSession, random_running_workflow_execution: WorkflowExecution ) -> None: """ Test for listing all workflow executions and filter by non-existing workflow version id. @@ -203,16 +205,16 @@ class TestWorkflowExecutionCRUDList: ---------- db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. - random_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowmdb.models.WorkflowExecution Random workflow execution for testing. """ executions = await CRUDWorkflowExecution.list(db, workflow_version_id=random_hex_string()) assert len(executions) == 0 - assert sum(1 for execution in executions if execution == random_workflow_execution) == 0 + assert sum(1 for execution in executions if execution == random_running_workflow_execution) == 0 @pytest.mark.asyncio async def test_get_list_workflow_executions_with_given_status( - self, db: AsyncSession, random_workflow_execution: WorkflowExecution + self, db: AsyncSession, random_running_workflow_execution: WorkflowExecution ) -> None: """ Test for listing all workflow executions and filter by status. @@ -221,19 +223,19 @@ class TestWorkflowExecutionCRUDList: ---------- db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. - random_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowmdb.models.WorkflowExecution Random workflow execution for testing. """ - executions = await CRUDWorkflowExecution.list(db, status_list=[random_workflow_execution.status]) + executions = await CRUDWorkflowExecution.list(db, status_list=[random_running_workflow_execution.status]) assert len(executions) > 0 - assert sum(1 for execution in executions if execution == random_workflow_execution) == 1 - assert sum(1 for execution in executions if execution.status == random_workflow_execution.status) >= 1 + assert sum(1 for execution in executions if execution == random_running_workflow_execution) == 1 + assert sum(1 for execution in executions if execution.status == random_running_workflow_execution.status) >= 1 class TestWorkflowExecutionCRUDLUpdate: @pytest.mark.asyncio async def test_cancel_workflow_execution( - self, db: AsyncSession, random_workflow_execution: WorkflowExecution + self, db: AsyncSession, random_running_workflow_execution: WorkflowExecution ) -> None: """ Test for canceling a workflow execution. @@ -242,23 +244,46 @@ class TestWorkflowExecutionCRUDLUpdate: ---------- db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. - random_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowmdb.models.WorkflowExecution Random workflow execution for testing. """ - await CRUDWorkflowExecution.cancel(db, random_workflow_execution.execution_id) + await CRUDWorkflowExecution.cancel(db, random_running_workflow_execution.execution_id) stmt = select(WorkflowExecution).where( - WorkflowExecution._execution_id == random_workflow_execution.execution_id.bytes + WorkflowExecution._execution_id == random_running_workflow_execution.execution_id.bytes ) execution = await db.scalar(stmt) assert execution is not None assert execution.status == WorkflowExecution.WorkflowExecutionStatus.CANCELED + @pytest.mark.asyncio + async def test_update_workflow_execution_slurm_job( + self, db: AsyncSession, random_running_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for updating the slurm job id of a workflow execution. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + random_running_workflow_execution : clowmdb.models.WorkflowExecution + Random workflow execution for testing. + """ + await CRUDWorkflowExecution.update_slurm_job_id(db, random_running_workflow_execution.execution_id, 250) + + stmt = select(WorkflowExecution).where( + WorkflowExecution._execution_id == random_running_workflow_execution.execution_id.bytes + ) + execution = await db.scalar(stmt) + assert execution is not None + assert execution.slurm_job_id == 250 + class TestWorkflowExecutionCRUDDelete: @pytest.mark.asyncio async def test_delete_workflow_execution( - self, db: AsyncSession, random_workflow_execution: WorkflowExecution + self, db: AsyncSession, random_running_workflow_execution: WorkflowExecution ) -> None: """ Test for deleting a workflow execution. @@ -267,13 +292,13 @@ class TestWorkflowExecutionCRUDDelete: ---------- db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. - random_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowmdb.models.WorkflowExecution Random workflow execution for testing. """ - await CRUDWorkflowExecution.delete(db, random_workflow_execution.execution_id) + await CRUDWorkflowExecution.delete(db, random_running_workflow_execution.execution_id) stmt = select(WorkflowExecution).where( - WorkflowExecution._execution_id == random_workflow_execution.execution_id.bytes + WorkflowExecution._execution_id == random_running_workflow_execution.execution_id.bytes ) execution = await db.scalar(stmt) assert execution is None diff --git a/app/tests/crud/test_workflow_version.py b/app/tests/crud/test_workflow_version.py index c0d6734ff195a75df32818839e4609d772add8ba..2dfddbee25323d282186c0115866658036eadd86 100644 --- a/app/tests/crud/test_workflow_version.py +++ b/app/tests/crud/test_workflow_version.py @@ -1,7 +1,10 @@ +from uuid import uuid4 + import pytest -from clowmdb.models import WorkflowVersion +from clowmdb.models import WorkflowMode, WorkflowVersion from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload from app.crud import CRUDWorkflowVersion from app.schemas.workflow import WorkflowOut @@ -28,6 +31,44 @@ class TestWorkflowVersionCRUDGet: assert version.workflow_id == random_workflow_version.workflow_id assert version.git_commit_hash == random_workflow_version.git_commit_hash + @pytest.mark.asyncio + async def test_get_specific_workflow_version_with_workflow_id( + self, db: AsyncSession, random_workflow_version: WorkflowVersion + ) -> None: + """ + Test for getting a workflow version and constraint it by a workflow ID from CRUD Repository. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + random_workflow_version : clowmdb.model.WorkflowVersion + Random workflow version for testing. + """ + version = await CRUDWorkflowVersion.get( + db, random_workflow_version.git_commit_hash, workflow_id=random_workflow_version.workflow_id + ) + assert version is not None + assert version.workflow_id == random_workflow_version.workflow_id + assert version.git_commit_hash == random_workflow_version.git_commit_hash + + @pytest.mark.asyncio + async def test_get_specific_workflow_version_with_false_workflow_id( + self, db: AsyncSession, random_workflow_version: WorkflowVersion + ) -> None: + """ + Test for getting a workflow version and constraint it with a non-existing workflow ID CRUD Repository. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + random_workflow_version : clowmdb.model.WorkflowVersion + Random workflow version for testing. + """ + version = await CRUDWorkflowVersion.get(db, random_workflow_version.git_commit_hash, workflow_id=uuid4()) + assert version is None + @pytest.mark.asyncio async def test_get_specific_workflow_version_with_populated_workflow( self, db: AsyncSession, random_workflow_version: WorkflowVersion @@ -145,10 +186,52 @@ class TestWorkflowVersionCRUDCreate: ) assert workflow_version is not None - stmt = select(WorkflowVersion).where(WorkflowVersion.git_commit_hash == workflow_version.git_commit_hash) + stmt = ( + select(WorkflowVersion) + .where(WorkflowVersion.git_commit_hash == workflow_version.git_commit_hash) + .options(selectinload(WorkflowVersion.workflow_modes)) + ) + created_workflow_version = await db.scalar(stmt) + assert created_workflow_version is not None + assert created_workflow_version == workflow_version + assert len(created_workflow_version.workflow_modes) == 0 + + @pytest.mark.asyncio + async def test_create_workflow_version_with_mode( + self, db: AsyncSession, random_workflow: WorkflowOut, random_workflow_mode: WorkflowMode + ) -> None: + """ + Test for creating a workflow version in CRUD Repository. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + random_workflow : app.schemas.workflow.WorkflowOut + Random bucket for testing. + random_workflow_mode : clowmdb.models.WorkflowMode + Random workflow mode for testing. + """ + workflow_version = await CRUDWorkflowVersion.create( + db, + git_commit_hash=random_hex_string(), + version="v2.0.0", + wid=random_workflow.workflow_id, + previous_version=random_workflow.versions[-1].git_commit_hash, + modes=[random_workflow_mode.mode_id], + ) + assert workflow_version is not None + + stmt = ( + select(WorkflowVersion) + .where(WorkflowVersion.git_commit_hash == workflow_version.git_commit_hash) + .options(selectinload(WorkflowVersion.workflow_modes)) + ) created_workflow_version = await db.scalar(stmt) assert created_workflow_version is not None assert created_workflow_version == workflow_version + assert len(created_workflow_version.workflow_modes) == 1 + assert created_workflow_version.workflow_modes[0].mode_id == random_workflow_mode.mode_id class TestWorkflowVersionCRUDUpdate: diff --git a/app/tests/utils/user.py b/app/tests/utils/user.py index 109cc0a9e56bf546a3d23c060026f70ceb31f200..ff514b4ca098533bce88bf17f3e6988a32d83343 100644 --- a/app/tests/utils/user.py +++ b/app/tests/utils/user.py @@ -7,7 +7,7 @@ from authlib.jose import JsonWebToken from clowmdb.models import User from sqlalchemy.ext.asyncio import AsyncSession -from .utils import random_lower_string +from .utils import random_hex_string, random_lower_string _jwt = JsonWebToken(["HS256"]) @@ -84,7 +84,7 @@ async def create_random_user(db: AsyncSession) -> User: Newly created user. """ user = User( - uid=random_lower_string(), + uid=random_hex_string(), display_name=random_lower_string(), ) db.add(user) diff --git a/app/utils/backoff_strategy.py b/app/utils/backoff_strategy.py index ca90f8176701510d3650e447d0b98e050033bcae..3e12ddc7ad50f4a72dc7d7a863bfd5ef0493a103 100644 --- a/app/utils/backoff_strategy.py +++ b/app/utils/backoff_strategy.py @@ -42,6 +42,11 @@ class BackoffStrategy(ABC, Generator): """ Compute the next value in the infinite series for this backoff strategy. + Parameters + ---------- + iteration : int + The number of the current interation + Returns ------- val : int diff --git a/requirements-dev.txt b/requirements-dev.txt index d0e72cf34023f4028e1b6d4eb7f07b0276c6444c..174fa2d54e6f59c1252c5332e373d64c7f5b8929 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -4,7 +4,7 @@ pytest-asyncio>=0.21.0,<0.22.0 pytest-cov>=4.1.0,<4.2.0 coverage[toml]>=7.3.0,<7.4.0 # Linters -ruff>=0.1.2,<0.1.3 +ruff>=0.1.3,<0.1.4 black>=23.10.0,<23.11.0 isort>=5.12.0,<5.13.0 mypy>=1.6.0,<1.7.0