diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 6ea806fdd741f951f956607ef5a8e3d78458e4ac..29b005217923aac85e2858910f1171328a74b2b5 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -15,6 +15,7 @@ variables: CLOWM_DEV_SYSTEM: true CLOWM_CLUSTER__JOB_MONITORING: "LINEAR" CLOWM_CLUSTER__ACTIVE_WORKFLOW_EXECUTION_LIMIT: 3 + CLOWM_CLUSTER__TOWER_SECRET: "nonempty" cache: paths: diff --git a/CONFIG.md b/CONFIG.md index e549c8fed6aed9909f3cea00a481c2966743afbc..f48297fb947f4c9c4fc5dbecd187a11752c7c66a 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -72,7 +72,8 @@ | Env variable | Config file key | Default | Value | Example | Description | |--------------------------------------------------|-------------------------------------------|-----------------------|--------------------------------------------|----------------------------|---------------------------------------------------------------------------------------------------| -| * `CLOWM_CLUSTER__SLURM__URI` | `cluster.slurm.uri` | unset | HHTP Url | `http://localhost` | HTTP URL to communicate with the Slurm cluster | +| * `CLOWM_CLUSTER__TOWER_SECRET` | `cluster.tower_secret` | unset | string | `tiI-h_PiFt7HqOc38ekbcQ` | Shared secret between cluster and logging service | +| * `CLOWM_CLUSTER__SLURM__URI` | `cluster.slurm.uri` | unset | HTTP Url | `http://localhost` | HTTP URL to communicate with the Slurm cluster | | * `CLOWM_CLUSTER__SLURM__TOKEN` | `cluster.slurm.token` | unset | String | - | JWT for communication with the Slurm REST API. | | `CLOWM_CLUSTER__SLURM__USER` | `cluster.slurm.user` | `slurm` | String | `slurm` | User on the slurm cluster who should run the job. Should be the user of the `cluster.slurm.token` | | `CLOWM_CLUSTER__NXF_CONFIG` | `cluster.nxf_config` | unset | Path on slurm cluster | `/path/to/nextflow.config` | Configuration file on the slurm cluster that is the same for every nextflow run | diff --git a/clowm/api/__init__.py b/clowm/api/__init__.py index 6a6e316e9e7dd44c164587b5c3d19c587c734ade..f7b2694a5763b516fec30deb7d72b903c297efd8 100644 --- a/clowm/api/__init__.py +++ b/clowm/api/__init__.py @@ -5,6 +5,7 @@ from fastapi import APIRouter, Depends, status from clowm.schemas.security import ErrorDetail from .dependencies.user import decode_bearer_token +from .dependencies.workflow import validate_tower_secret from .endpoints import ( bucket_permissions, buckets, @@ -12,6 +13,7 @@ from .endpoints import ( resource_version, resources, s3key, + trace, users, workflow, workflow_credentials, @@ -69,3 +71,5 @@ for endpoint in [ dependencies=[Depends(decode_bearer_token)], responses=alternative_responses, ) + +api_router.include_router(trace.router, dependencies=[Depends(validate_tower_secret)], include_in_schema=False) diff --git a/clowm/api/background/cluster.py b/clowm/api/background/cluster.py index 723d9607e4b89742764ee7f0792d57248ddcb04f..d0f41213ebdbf9ba70a93804cdb81826421ff878 100644 --- a/clowm/api/background/cluster.py +++ b/clowm/api/background/cluster.py @@ -213,10 +213,11 @@ async def start_workflow_execution( span.set_attribute("workflow_execution_status", str(fresh_execution.status)) if fresh_execution.end_time is None: # Mark job as finished with an error - await CRUDWorkflowExecution.set_error( + await CRUDWorkflowExecution.update_status( db=db, execution_id=execution.execution_id, status=WorkflowExecution.WorkflowExecutionStatus.ERROR, + end_timestamp=round(time.time()), ) await _monitor_proper_job_execution( @@ -228,8 +229,11 @@ async def start_workflow_execution( except (HTTPError, KeyError): # Mark job as aborted when there is an error async with background_dependencies.get_background_db() as db: - await CRUDWorkflowExecution.set_error( - db=db, execution_id=execution.execution_id, status=WorkflowExecution.WorkflowExecutionStatus.ERROR + await CRUDWorkflowExecution.update_status( + db=db, + execution_id=execution.execution_id, + status=WorkflowExecution.WorkflowExecutionStatus.ERROR, + end_timestamp=round(time.time()), ) diff --git a/clowm/api/background/metrics.py b/clowm/api/background/metrics.py new file mode 100644 index 0000000000000000000000000000000000000000..a5b871deac5ec31cb81fd6df248ac76a3e61d8da --- /dev/null +++ b/clowm/api/background/metrics.py @@ -0,0 +1,40 @@ +from uuid import UUID + +from opentelemetry import trace + +from clowm.crud import CRUDWorkflowExecution +from clowm.schemas.tower import Task + +from ...api.background import dependencies + +tracer = trace.get_tracer_provider().get_tracer(__name__) + + +async def save_cpu_hours(execution_id: UUID, tasks: list[Task]) -> None: + """ + Update the cpu-hours field of a workflow execution with the spent CPU hours so far. + + Parameters + ---------- + execution_id : uuid.UUID + Id of a workflow execution. + tasks : list[clowm.schemas.tower.Task] + A list of nextflow tasks. + """ + with tracer.start_as_current_span( + "background_save_cpu_hours", attributes={"execution_id": str(execution_id)} + ) as span: + cpu_hours = 0.0 + cpus = [] + realtime = [] + task_ids = [] + for task in tasks: + if task.status == "COMPLETED" and task.realtime is not None: + cpus.append(task.cpus) + realtime.append(task.realtime) + task_ids.append(task.taskId) + cpu_hours += task.cpus * task.realtime / 60000000 + if cpu_hours > 0: + span.set_attributes({"cpus": cpus, "realtime": realtime, "task_ids": task_ids}) + async with dependencies.get_background_db() as db: + await CRUDWorkflowExecution.update_cpu_hours(db=db, execution_id=execution_id, cpu_hours=cpu_hours) diff --git a/clowm/api/dependencies/__init__.py b/clowm/api/dependencies/__init__.py index b2a4e9f27185b4a458ef48e2f0097b8dfd5672cb..fb0e4f6a4dc578057c78be265fd9c295327ea320 100644 --- a/clowm/api/dependencies/__init__.py +++ b/clowm/api/dependencies/__init__.py @@ -2,7 +2,7 @@ from .bucket import CurrentBucket from .resource import CurrentResource, CurrentResourceVersion from .services import DBSession, HTTPClient, OIDCClientDep, RGWService, S3Service from .user import AuthorizationDependency, CurrentUser, PathUser -from .workflow import CurrentWorkflow, CurrentWorkflowExecution, CurrentWorkflowVersion +from .workflow import CurrentWorkflow, CurrentWorkflowExecution, CurrentWorkflowVersion, TowerWorkflowExecution __all__ = [ "RGWService", @@ -19,4 +19,5 @@ __all__ = [ "CurrentWorkflowExecution", "CurrentResource", "CurrentResourceVersion", + "TowerWorkflowExecution", ] diff --git a/clowm/api/dependencies/resource.py b/clowm/api/dependencies/resource.py index 9a3e06cd1880c0b41a9712485da71d8762347661..f8728d78506a9b4c735e237ee5e931aca97ca9f3 100644 --- a/clowm/api/dependencies/resource.py +++ b/clowm/api/dependencies/resource.py @@ -27,7 +27,7 @@ async def get_current_resource(rid: Annotated[UUID, Path()], db: DBSession) -> R Returns ------- - resource : clowmdb.models.Resource + resource : clowm.models.Resource Resource associated with the ID in the path. """ resource = await CRUDResource.get(db, resource_id=rid) @@ -49,12 +49,12 @@ async def get_current_resource_version(rvid: Annotated[UUID, Path()], resource: ---------- rvid : uuid. UUID ID of a resource version. Path parameter. - resource : clowmdb.models.Resource + resource : clowm.models.Resource Resource associated with the ID in the path. Returns ------- - resource_version : clowmdb.models.ResourceVersion + resource_version : clowm.models.ResourceVersion Resource Version associated with the ID in the path. """ resource_version: list[ResourceVersion] = [ diff --git a/clowm/api/dependencies/workflow.py b/clowm/api/dependencies/workflow.py index aeca587a5574b1faf509b4b2732db98ce9b734cb..94f6d6c2ad46548453a245005524a8705c4fa811 100644 --- a/clowm/api/dependencies/workflow.py +++ b/clowm/api/dependencies/workflow.py @@ -1,9 +1,12 @@ +import secrets from typing import Annotated from uuid import UUID from fastapi import Depends, HTTPException, Path, status +from fastapi.security import HTTPBasic, HTTPBasicCredentials from opentelemetry import trace +from clowm.core.config import settings from clowm.crud import CRUDWorkflow, CRUDWorkflowExecution, CRUDWorkflowVersion from clowm.models import Workflow, WorkflowExecution, WorkflowVersion @@ -11,6 +14,9 @@ from .services import DBSession tracer = trace.get_tracer_provider().get_tracer(__name__) +security = HTTPBasic() +secret_bytes = settings.cluster.tower_secret.get_secret_value().encode("utf-8") + async def get_current_workflow( wid: Annotated[UUID, Path(description="ID of a workflow", examples=["0cc78936-381b-4bdd-999d-736c40591078"])], @@ -117,3 +123,57 @@ async def get_current_workflow_version( CurrentWorkflowVersion = Annotated[WorkflowVersion, Depends(get_current_workflow_version)] + + +async def get_current_workflow_execution_by_id_fragment( + eid: Annotated[ + str, Path(description="Start of ID of a workflow execution.", examples=["0cc78936381b4bdd"], max_length=16) + ], + db: DBSession, +) -> WorkflowExecution: + """ + Get the workflow from the database with the start of ID given in the path. + + FastAPI Dependency Injection Function. + + Parameters + ---------- + eid: str + Start of ID of workflow execution. Path parameter. + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. Dependency Injection. + + Returns + ------- + workflow_execution : clowm.models.Workflow + Workflow execution associated with start of ID in the path. + """ + execution = await CRUDWorkflowExecution.get_by_id_fragment(db=db, execution_id_start=eid) + if execution is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail=f"Workflow Execution with start of id '{eid}' not found" + ) + return execution + + +TowerWorkflowExecution = Annotated[WorkflowExecution, Depends(get_current_workflow_execution_by_id_fragment)] + + +@tracer.start_as_current_span("validate_http_credentials") +def validate_tower_secret(credentials: Annotated[HTTPBasicCredentials, Depends(security)]) -> None: + """ + Validate the secret that nextflow send with the metrics. + + Parameters + ---------- + credentials : fastapi.security.HTTPBasicCredentials + Tuple of irrelevant username and tower secret. + """ + current_password_bytes = credentials.password.encode("utf-8") + is_correct_password = secrets.compare_digest(current_password_bytes, secret_bytes) + if not is_correct_password: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect Nextflow Tower secret", + headers={"WWW-Authenticate": "Basic"}, + ) diff --git a/clowm/api/endpoints/resource_version.py b/clowm/api/endpoints/resource_version.py index f08ec0b29cbfceb82623b8e5151578329d278087..e2065cd1112046ef52b22cc30773f681e9866ad5 100644 --- a/clowm/api/endpoints/resource_version.py +++ b/clowm/api/endpoints/resource_version.py @@ -81,11 +81,11 @@ async def list_resource_versions( ---------- authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - resource : clowmdb.models.Resource + resource : clowm.models.Resource Resource associated with the ID in the path. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency injection. - version_status : list[clowmdb.models.ResourceVersion.Status] | None, default None + version_status : list[clowm.models.ResourceVersion.Status] | None, default None Filter resource version by their status. Query Parameter. """ current_span = trace.get_current_span() @@ -126,11 +126,11 @@ async def request_resource_version( ---------- authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - resource : clowmdb.models.Resource + resource : clowm.models.Resource Resource associated with the ID in the path. Dependency Injection. resource_version_in : clowm.schemas.resource_version.ResourceVersionIn Data about the new resource version. HTTP Body. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. @@ -178,11 +178,11 @@ async def get_resource_version( ---------- authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - resource : clowmdb.models.Resource + resource : clowm.models.Resource Resource associated with the ID in the path. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency injection. - resource_version : clowmdb.models.ResourceVersion + resource_version : clowm.models.ResourceVersion Resource Version associated with the ID in the path. Dependency Injection. """ trace.get_current_span().set_attributes( @@ -222,7 +222,7 @@ async def resource_file_tree( Raw request object to read headers from. s3 : types_aiobotocore_s3.service_resource import S3ServiceResource S3 Service to perform operations on buckets. Dependency Injection. - resource_version : clowmdb.models.ResourceVersion + resource_version : clowm.models.ResourceVersion Resource Version associated with the ID in the path. Dependency Injection. """ authorization(RBACOperation.READ) @@ -301,11 +301,11 @@ async def request_resource_version_review( ---------- authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - resource : clowmdb.models.Resource + resource : clowm.models.Resource Resource associated with the ID in the path. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency injection. - resource_version : clowmdb.models.ResourceVersion + resource_version : clowm.models.ResourceVersion Resource Version associated with the ID in the path. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. @@ -374,11 +374,11 @@ async def request_resource_version_sync( ---------- authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - resource : clowmdb.models.Resource + resource : clowm.models.Resource Resource associated with the ID in the path. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency injection. - resource_version : clowmdb.models.ResourceVersion + resource_version : clowm.models.ResourceVersion Resource Version associated with the ID in the path. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. @@ -444,11 +444,11 @@ async def resource_version_review( ---------- authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - resource : clowmdb.models.Resource + resource : clowm.models.Resource Resource associated with the ID in the path. Dependency Injection. background_tasks : fastapi.BackgroundTasks Entrypoint for new BackgroundTasks. Provided by FastAPI. - resource_version : clowmdb.models.ResourceVersion + resource_version : clowm.models.ResourceVersion Resource Version associated with the ID in the path. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. @@ -503,9 +503,9 @@ async def resource_version_sync( ---------- authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - resource : clowmdb.models.Resource + resource : clowm.models.Resource Resource associated with the ID in the path. Dependency Injection. - resource_version : clowmdb.models.ResourceVersion + resource_version : clowm.models.ResourceVersion Resource Version associated with the ID in the path. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. @@ -594,9 +594,9 @@ async def resource_version_latest( Function to call determines if the current user is authorized for this request. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. - resource : clowmdb.models.Resource + resource : clowm.models.Resource Resource associated with the ID in the path. Dependency Injection. - resource_version : clowmdb.models.ResourceVersion + resource_version : clowm.models.ResourceVersion Resource Version associated with the ID in the path. Dependency Injection. background_tasks : fastapi.BackgroundTasks Entrypoint for new BackgroundTasks. Provided by FastAPI. @@ -637,9 +637,9 @@ async def delete_resource_version_cluster( ---------- authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - resource : clowmdb.models.Resource + resource : clowm.models.Resource Resource associated with the ID in the path. Dependency Injection. - resource_version : clowmdb.models.ResourceVersion + resource_version : clowm.models.ResourceVersion Resource Version associated with the ID in the path. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. @@ -686,9 +686,9 @@ async def delete_resource_version_s3( ---------- authorization : Callable[[str], Awaitable[Any]] Async function to ask the auth service for authorization. Dependency Injection. - resource : clowmdb.models.Resource + resource : clowm.models.Resource Resource associated with the ID in the path. Dependency Injection. - resource_version : clowmdb.models.ResourceVersion + resource_version : clowm.models.ResourceVersion Resource Version associated with the ID in the path. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. diff --git a/clowm/api/endpoints/resources.py b/clowm/api/endpoints/resources.py index 2677834a814b8d43ef9137902156057db5c44208..aa2bea1d2c150b873c2f6b41bdb94a79e762a717 100644 --- a/clowm/api/endpoints/resources.py +++ b/clowm/api/endpoints/resources.py @@ -60,13 +60,13 @@ async def list_resources( ---------- authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. maintainer_id : uuid.UUID | None, default None Filter resources by a maintainer. Query Parameter. - version_status : list[clowmdb.models.ResourceVersion.Status] | None, default None + version_status : list[clowm.models.ResourceVersion.Status] | None, default None Filter resource version by their status. Query Parameter. name_substring : str | None, default None Filter resources by a substring in their name. Query Parameter. @@ -142,7 +142,7 @@ async def create_resource( ---------- authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency injection. resource_in : clowm.schemas.resource.ResourceIn Data about the new resource. HTTP Body. @@ -197,11 +197,11 @@ async def get_resource( ---------- authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency injection. - resource : clowmdb.models.Resource + resource : clowm.models.Resource Resource associated with the ID in the path. Dependency Injection. - version_status : list[clowmdb.models.ResourceVersion.Status] | None, default None + version_status : list[clowm.models.ResourceVersion.Status] | None, default None Filter resource version by their status. Query Parameter. """ current_span = trace.get_current_span() @@ -241,7 +241,7 @@ async def delete_resource( ---------- authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - resource : clowmdb.models.Resource + resource : clowm.models.Resource Resource associated with the ID in the path. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. diff --git a/clowm/api/endpoints/trace.py b/clowm/api/endpoints/trace.py new file mode 100644 index 0000000000000000000000000000000000000000..28398835b70e1080bbf99ee6ea919a0ec10bfe28 --- /dev/null +++ b/clowm/api/endpoints/trace.py @@ -0,0 +1,182 @@ +from typing import Annotated + +from fastapi import APIRouter, BackgroundTasks, Query +from opentelemetry import trace + +from clowm.core.config import settings +from clowm.crud import CRUDWorkflowExecution +from clowm.models.workflow_execution import WorkflowExecution +from clowm.otlp import start_as_current_span_async +from clowm.schemas.tower import BeginWorkflow, CompleteWorkflow, CreateWorkflowIn, CreateWorkflowOut, ReportProgress + +from ..background.metrics import save_cpu_hours +from ..dependencies import DBSession, TowerWorkflowExecution +from ..dependencies.workflow import get_current_workflow_execution_by_id_fragment + +router = APIRouter(prefix="/trace", tags=["Trace"]) + +tracer = trace.get_tracer_provider().get_tracer(__name__) + + +@router.post("/create", summary="Create workflow run event") +@start_as_current_span_async("api_create_workflow_execution", tracer=tracer) +async def create( + workspaceId: Annotated[ + str, Query(description="ID of the workflow execution", examples=["0cc78936381b4bdd"], max_length=16) + ], + event_in: CreateWorkflowIn, + db: DBSession, +) -> CreateWorkflowOut: + """ + Event send by nextflow for creating the workflow run. + \f + Parameters + ---------- + workspaceId : str + ID of the nextflow workspace. Correspond to the start of the workflow execution ID. URL Query Parameter. + event_in : clowm.schemas.tower.CreateWorkflowIn + Additional data send by nextflow. HTTP Body. + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. Dependency Injection. + + Returns + ------- + workflow_out : clowm.schemas.tower.CreateWorkflowOut + Response with execution ID which can be parsed by nextflow. + """ + trace.get_current_span().set_attribute("workflow_execution_id_start", workspaceId) + # Check that Workflow execution exists + execution = await get_current_workflow_execution_by_id_fragment(eid=workspaceId, db=db) + await CRUDWorkflowExecution.update_status( + db=db, execution_id=execution.execution_id, status=WorkflowExecution.WorkflowExecutionStatus.SCHEDULED + ) + return CreateWorkflowOut(workflowId=workspaceId) + + +@router.put("/{eid}/begin", summary="Begin workflow run event") +@start_as_current_span_async("api_begin_workflow_execution", tracer=tracer) +async def begin( + event_in: BeginWorkflow, + workflow_execution: TowerWorkflowExecution, + db: DBSession, +) -> dict[str, str]: + """ + Event send by nextflow for beginning the workflow run. + \f + Parameters + ---------- + event_in : clowm.schemas.tower.BeginWorkflow + Additional data send by nextflow. HTTP Body. + workflow_execution : clowm.models.Workflow + Workflow execution associated with start of ID in the path. + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. Dependency Injection. + + Returns + ------- + response : dict[str, str] + Response with watch url + """ + trace.get_current_span().set_attribute("workflow_execution_id", str(workflow_execution.execution_id)) + await CRUDWorkflowExecution.update_status( + db=db, execution_id=workflow_execution.execution_id, status=WorkflowExecution.WorkflowExecutionStatus.RUNNING + ) + return {"watchUrl": str(settings.ui_uri) + settings.api_prefix.strip("/")} + + +@router.put("/{eid}/complete", summary="Complete workflow run event") +@start_as_current_span_async("api_complete_workflow_execution", tracer=tracer) +async def complete( + event_in: CompleteWorkflow, + workflow_execution: TowerWorkflowExecution, + db: DBSession, +) -> dict: + """ + Event send by nextflow for completing the workflow run. + \f + Parameters + ---------- + event_in : clowm.schemas.tower.CompleteWorkflow + Additional data send by nextflow. HTTP Body. + workflow_execution : clowm.models.Workflow + Workflow execution associated with start of ID in the path. + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. Dependency Injection. + + Returns + ------- + response : Dict + Empty Dict + """ + current_span = trace.get_current_span() + status = WorkflowExecution.WorkflowExecutionStatus.ERROR + current_span.set_attributes( + {"workflow_execution_id": str(workflow_execution.execution_id), "success": event_in.workflow.success} + ) + if event_in.workflow.errorReport is not None: + current_span.set_attribute("error_report", event_in.workflow.errorReport) + if event_in.workflow.success: + status = WorkflowExecution.WorkflowExecutionStatus.SUCCESS + elif event_in.workflow.errorReport is not None and event_in.workflow.errorReport.startswith("SIG"): + status = WorkflowExecution.WorkflowExecutionStatus.CANCELED + await CRUDWorkflowExecution.update_status( + db=db, execution_id=workflow_execution.execution_id, status=status, end_timestamp=event_in.instant + ) + return {} + + +@router.put("/{eid}/progress", summary="Progress workflow event") +@start_as_current_span_async("api_progress_workflow_execution", tracer=tracer) +async def progress( + progress_event: ReportProgress, + workflow_execution: TowerWorkflowExecution, + background_tasks: BackgroundTasks, +) -> dict: + """ + Event send by nextflow for progress in the workflow run. + \f + Parameters + ---------- + progress_event : clowm.schemas.tower.ReportProgress + Additional data send by nextflow. HTTP Body. + workflow_execution : clowm.models.Workflow + Workflow execution associated with start of ID in the path. + background_tasks : fastapi.BackgroundTasks + + + Returns + ------- + response : Dict + Empty Dict + """ + trace.get_current_span().set_attribute("workflow_execution_id", str(workflow_execution.execution_id)) + if progress_event.tasks: + background_tasks.add_task( + save_cpu_hours, execution_id=workflow_execution.execution_id, tasks=progress_event.tasks + ) + return {} + + +@router.put("/{eid}/heartbeat", summary="Heartbeat workflow event") +@start_as_current_span_async("api_heartbeat_workflow_execution", tracer=tracer) +async def heartbeat( + progress_event: ReportProgress, + workflow_execution: TowerWorkflowExecution, +) -> dict: + """ + Event send by nextflow for heartbeat signal by workflow run. + \f + Parameters + ---------- + progress_event : clowm.schemas.tower.ReportProgress + Additional data send by nextflow. HTTP Body. + workflow_execution : clowm.models.Workflow + Workflow execution associated with start of ID in the path. + + Returns + ------- + response : Dict + Empty Dict + """ + trace.get_current_span().set_attribute("workflow_execution_id", str(workflow_execution.execution_id)) + return {} diff --git a/clowm/api/endpoints/workflow.py b/clowm/api/endpoints/workflow.py index a61cc3913a6df59afa6fe2441d35acd899d96527..81b26ace3eeff7e21303f289f897c9d45345f2e7 100644 --- a/clowm/api/endpoints/workflow.py +++ b/clowm/api/endpoints/workflow.py @@ -71,11 +71,11 @@ async def list_workflows( Filter workflows by a developer. Query Parameter. name_substring : string | None, default None Filter workflows by a substring in their name. Query Parameter. - version_status : list[clowmdb.models.WorkflowVersion.Status] | None, default None + version_status : list[clowm.models.WorkflowVersion.Status] | None, default None Status of Workflow versions to filter for to fetch. Query Parameter. authorization : Callable[[str], Awaitable[Any]] Async function to ask the auth service for authorization. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency injection. Returns @@ -135,7 +135,7 @@ async def create_workflow( Data about the new Workflow. HTML Body. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency Injection. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. @@ -244,7 +244,7 @@ async def get_developer_workflow_statistics( Async function to ask the auth service for authorization. Dependency Injection. response : fastapi.Response Temporary Response object. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency Injection. developer_id : str | None, default None Filter for workflows developed by a specific user. Query Parameter. @@ -298,13 +298,13 @@ async def get_workflow( \f Parameters ---------- - workflow : clowmdb.models.Workflow + workflow : clowm.models.Workflow Workflow with given ID. Dependency Injection. - version_status : list[clowmdb.models.WorkflowVersion.Status] | None, default None + version_status : list[clowm.models.WorkflowVersion.Status] | None, default None Status of Workflow versions to filter for to fetch. Query Parameter db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency Injection. authorization : Callable[[str], Awaitable[Any]] Async function to ask the auth service for authorization. Dependency Injection. @@ -344,7 +344,7 @@ async def get_workflow_statistics( \f Parameters ---------- - workflow : clowmdb.models.Workflow + workflow : clowm.models.Workflow Workflow with given ID. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. @@ -384,13 +384,13 @@ async def delete_workflow( ---------- background_tasks : fastapi.BackgroundTasks Entrypoint for new BackgroundTasks. Provided by FastAPI. - workflow : clowmdb.models.Workflow + workflow : clowm.models.Workflow Workflow with given ID. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. authorization : Callable[[str], Awaitable[Any]] Async function to ask the auth service for authorization. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency Injection. """ trace.get_current_span().set_attribute("workflow_id", str(workflow.workflow_id)) @@ -447,11 +447,11 @@ async def update_workflow( ---------- background_tasks : fastapi.BackgroundTasks Entrypoint for new BackgroundTasks. Provided by FastAPI - workflow : clowmdb.models.Workflow + workflow : clowm.models.Workflow Workflow with given ID. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency Injection. authorization : Callable[[str], Awaitable[Any]] Async function to ask the auth service for authorization. Dependency Injection. diff --git a/clowm/api/endpoints/workflow_credentials.py b/clowm/api/endpoints/workflow_credentials.py index 17cb425444807717e376db45c71ec7a1b8b4b7d5..d4586c00c3462c8730bf0d1413d0a52845ce7cab 100644 --- a/clowm/api/endpoints/workflow_credentials.py +++ b/clowm/api/endpoints/workflow_credentials.py @@ -43,9 +43,9 @@ async def get_workflow_credentials( \f Parameters ---------- - workflow : clowmdb.models.Workflow + workflow : clowm.models.Workflow Workflow with given ID. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency Injection. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. @@ -84,11 +84,11 @@ async def update_workflow_credentials( ---------- credentials : clowm.schemas.workflow.WorkflowCredentialsIn Updated credentials for the workflow git repository - workflow : clowmdb.models.Workflow + workflow : clowm.models.Workflow Workflow with given ID. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency Injection. authorization : Callable[[str], Awaitable[Any]] Async function to ask the auth service for authorization. Dependency Injection. @@ -145,11 +145,11 @@ async def delete_workflow_credentials( \f Parameters ---------- - workflow : clowmdb.models.Workflow + workflow : clowm.models.Workflow Workflow with given ID. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency Injection. authorization : Callable[[str], Awaitable[Any]] Async function to ask the auth service for authorization. Dependency Injection. diff --git a/clowm/api/endpoints/workflow_execution.py b/clowm/api/endpoints/workflow_execution.py index 36a038d1fd932fa9f36f50002413a73876e55b8a..827395d877e4fee565fbe0c780a5d1224263596c 100644 --- a/clowm/api/endpoints/workflow_execution.py +++ b/clowm/api/endpoints/workflow_execution.py @@ -1,4 +1,5 @@ import json +import time from tempfile import SpooledTemporaryFile from typing import Annotated, Any, Callable from uuid import UUID @@ -60,7 +61,7 @@ async def start_workflow( Meta-data and parameters for the workflow to start. HTTP Body. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user who will be the owner of the newly created bucket. Dependency Injection. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. @@ -71,7 +72,7 @@ async def start_workflow( Returns ------- - execution : clowmdb.models.WorkflowExecution + execution : clowm.models.WorkflowExecution Created workflow execution from the database """ current_span = trace.get_current_span() @@ -214,7 +215,7 @@ async def start_arbitrary_workflow( Meta-data and parameters for the workflow to start. HTTP Body. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user who will be the owner of the newly created bucket. Dependency Injection. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. @@ -223,7 +224,7 @@ async def start_arbitrary_workflow( Returns ------- - execution : clowmdb.models.WorkflowExecution + execution : clowm.models.WorkflowExecution Created workflow execution from the database """ if not settings.dev_system: # pragma: no cover @@ -342,20 +343,20 @@ async def list_workflow_executions( ---------- executor_id : str | None, default None Filter for workflow executions by a user. Query Parameter. - execution_status : list[clowmdb.models.WorkflowExecution.WorkflowExecutionStatus] | None, default None + execution_status : list[clowm.models.WorkflowExecution.WorkflowExecutionStatus] | None, default None Filter for status of workflow execution. Query Parameter. workflow_version_id : str | None, default None Filter for workflow version, Query Parameter. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user who will be the owner of the newly created bucket. Dependency Injection. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. Returns ------- - executions : list[clowmdb.models.WorkflowExecution] + executions : list[clowm.models.WorkflowExecution] List of filtered workflow executions. """ current_span = trace.get_current_span() @@ -397,16 +398,16 @@ async def get_workflow_execution( \f Parameters ---------- - workflow_execution : clowmdb.models.WorkflowExecution + workflow_execution : clowm.models.WorkflowExecution Workflow execution with given ID. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user who will be the owner of the newly created bucket. Dependency Injection. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. Returns ------- - execution : clowmdb.models.WorkflowExecution + execution : clowm.models.WorkflowExecution Workflow execution with the given ID. """ trace.get_current_span().set_attribute("execution_id", str(workflow_execution.execution_id)) @@ -433,9 +434,9 @@ async def get_workflow_execution_params( \f Parameters ---------- - workflow_execution : clowmdb.models.WorkflowExecution + workflow_execution : clowm.models.WorkflowExecution Workflow execution with given ID. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user who will be the owner of the newly created bucket. Dependency Injection. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. @@ -444,7 +445,7 @@ async def get_workflow_execution_params( Returns ------- - execution : clowmdb.models.WorkflowExecution + execution : clowm.models.WorkflowExecution Workflow execution with the given id. """ trace.get_current_span().set_attribute("execution_id", str(workflow_execution.execution_id)) @@ -478,11 +479,11 @@ async def delete_workflow_execution( ---------- background_tasks : fastapi.BackgroundTasks Entrypoint for new BackgroundTasks. Provided by FastAPI. - workflow_execution : clowmdb.models.WorkflowExecution + workflow_execution : clowm.models.WorkflowExecution Workflow execution with given ID. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user who will be the owner of the newly created bucket. Dependency Injection. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. @@ -524,11 +525,11 @@ async def cancel_workflow_execution( ---------- background_tasks : fastapi.BackgroundTasks Entrypoint for new BackgroundTasks. Provided by FastAPI. - workflow_execution : clowmdb.models.WorkflowExecution + workflow_execution : clowm.models.WorkflowExecution Workflow execution with given ID. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user who will be the owner of the newly created bucket. Dependency Injection. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. @@ -548,4 +549,9 @@ async def cancel_workflow_execution( ) if workflow_execution.slurm_job_id >= 0: background_tasks.add_task(cancel_slurm_job, job_id=workflow_execution.slurm_job_id) - await CRUDWorkflowExecution.set_error(workflow_execution.execution_id, db=db) + await CRUDWorkflowExecution.update_status( + workflow_execution.execution_id, + db=db, + status=WorkflowExecution.WorkflowExecutionStatus.CANCELED, + end_timestamp=round(time.time()), + ) diff --git a/clowm/api/endpoints/workflow_version.py b/clowm/api/endpoints/workflow_version.py index e98590c62e02e5149246c55dd23257f5e987cc81..9f4d272859c7ecbc488d13cf6a85982bf333407a 100644 --- a/clowm/api/endpoints/workflow_version.py +++ b/clowm/api/endpoints/workflow_version.py @@ -61,13 +61,13 @@ async def list_workflow_version( \f Parameters ---------- - workflow : clowmdb.models.Workflow + workflow : clowm.models.Workflow Workflow with given ID. Dependency Injection. - version_status : list[clowmdb.models.WorkflowVersion.Status] | None, default None + version_status : list[clowm.models.WorkflowVersion.Status] | None, default None Status of Workflow versions to filter for to fetch. Query Parameter db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user who will be the owner of the newly created bucket. Dependency Injection. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. @@ -128,13 +128,13 @@ async def get_workflow_version( \f Parameters ---------- - workflow : clowmdb.models.Workflow + workflow : clowm.models.Workflow Workflow with given ID. Dependency Injection. git_commit_hash: str Version ID db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user who will be the owner of the newly created bucket. Dependency Injection. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. @@ -193,11 +193,11 @@ async def update_workflow_version_status( ---------- background_tasks : fastapi.BackgroundTasks Entrypoint for new BackgroundTasks. Provided by FastAPI. - workflow : clowmdb.models.Workflow + workflow : clowm.models.Workflow Workflow with given ID. Dependency Injection. version_status : clowm.schemas.workflow_version.WorkflowVersionStatus New Status of the workflow version. HTTP Body. - workflow_version : clowmdb.models.WorkflowVersion + workflow_version : clowm.models.WorkflowVersion Workflow version with given ID. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. @@ -206,7 +206,7 @@ async def update_workflow_version_status( Returns ------- - version : clowmdb.models.WorkflowVersion + version : clowm.models.WorkflowVersion Version of the workflow with updated status """ trace.get_current_span().set_attributes( @@ -249,20 +249,20 @@ async def deprecate_workflow_version( \f Parameters ---------- - workflow : clowmdb.models.Workflow + workflow : clowm.models.Workflow Workflow with given ID. Dependency Injection. - workflow_version : clowmdb.models.WorkflowVersion + workflow_version : clowm.models.WorkflowVersion Workflow version with given ID. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - current_user: clowmdb.models.User + current_user: clowm.models.User Current user who will be the owner of the newly created bucket. Dependency Injection. Returns ------- - version : clowmdb.models.WorkflowVersion + version : clowm.models.WorkflowVersion Version of the workflow with deprecated status """ trace.get_current_span().set_attributes( @@ -296,22 +296,22 @@ async def update_workflow_version_parameter_extension( \f Parameters ---------- - workflow : clowmdb.models.Workflow + workflow : clowm.models.Workflow Workflow with given ID. Dependency Injection. - workflow_version : clowmdb.models.WorkflowVersion + workflow_version : clowm.models.WorkflowVersion Workflow version with given ID. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - current_user: clowmdb.models.User + current_user: clowm.models.User Current user who will be the owner of the newly created bucket. Dependency Injection. parameter_extension : clowm.schemas.workflow_version.ParameterExtension Parameter extension specific for this CloWM instance. Returns ------- - version : clowmdb.models.WorkflowVersion + version : clowm.models.WorkflowVersion Version of the workflow with the updated parameter extension """ trace.get_current_span().set_attributes( @@ -347,15 +347,15 @@ async def delete_workflow_version_parameter_extension( \f Parameters ---------- - workflow : clowmdb.models.Workflow + workflow : clowm.models.Workflow Workflow with given ID. Dependency Injection. - workflow_version : clowmdb.models.WorkflowVersion + workflow_version : clowm.models.WorkflowVersion Workflow version with given ID. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - current_user: clowmdb.models.User + current_user: clowm.models.User Current user who will be the owner of the newly created bucket. Dependency Injection. """ trace.get_current_span().set_attributes( @@ -397,9 +397,9 @@ async def download_workflow_documentation( ---------- request : fastapi.Request Raw request object - workflow : clowmdb.models.Workflow + workflow : clowm.models.Workflow Workflow with given ID. Dependency Injection. - workflow_version : clowmdb.models.WorkflowVersion + workflow_version : clowm.models.WorkflowVersion Workflow version with given ID. Dependency Injection. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. @@ -514,15 +514,15 @@ async def upload_workflow_version_icon( \f Parameters ---------- - workflow : clowmdb.models.Workflow + workflow : clowm.models.Workflow Workflow with given ID. Dependency Injection. background_tasks : fastapi.BackgroundTasks Entrypoint for new BackgroundTasks. Provided by FastAPI. - workflow_version : clowmdb.models.WorkflowVersion + workflow_version : clowm.models.WorkflowVersion Workflow version with given ID. Dependency Injection. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. @@ -576,15 +576,15 @@ async def delete_workflow_version_icon( \f Parameters ---------- - workflow : clowmdb.models.Workflow + workflow : clowm.models.Workflow Workflow with given ID. Dependency Injection. - workflow_version : clowmdb.models.WorkflowVersion + workflow_version : clowm.models.WorkflowVersion Workflow version with given ID. Dependency Injection. background_tasks : fastapi.BackgroundTasks Entrypoint for new BackgroundTasks. Provided by FastAPI. authorization : Callable[[clowm.core.rbac.RBACOperation], None] Function to call determines if the current user is authorized for this request. Dependency Injection. - current_user : clowmdb.models.User + current_user : clowm.models.User Current user. Dependency Injection. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. Dependency Injection. diff --git a/clowm/core/config.py b/clowm/core/config.py index 24ef7bab8e452e1711f6cfecdb6a24509355a719..e6224e415d0597f0cc03c37a0fe9958211723a50 100644 --- a/clowm/core/config.py +++ b/clowm/core/config.py @@ -210,6 +210,7 @@ class ClusterSettings(BaseModel): resource_container_path: str = Field( "/vol/resources", description="Base path in the container where all resources are available" ) + tower_secret: SecretStr = Field(..., description="Shared secret between nextflow cluster and logging services") class Settings(BaseSettings): diff --git a/clowm/crud/crud_resource.py b/clowm/crud/crud_resource.py index c11f6b1685e430a4d58d0b4b8eadf08c3f77c99e..e603ca5bbb42b8b6774bbc804a52fac7d3e8f06d 100644 --- a/clowm/crud/crud_resource.py +++ b/clowm/crud/crud_resource.py @@ -28,7 +28,7 @@ class CRUDResource: Returns ------- - resource : clowmdb.models.Resource | None + resource : clowm.models.Resource | None The resource with the given ID if it exists, None otherwise """ @@ -52,7 +52,7 @@ class CRUDResource: Returns ------- - resource : clowmdb.models.Resource | None + resource : clowm.models.Resource | None The resource with the given name if it exists, None otherwise """ @@ -79,14 +79,14 @@ class CRUDResource: Substring to filter for in the name of a resource. maintainer_id : uuid.UUID | None, default None Filter resources by maintainer. - version_status : list[clowmdb.models.ResourceVersion.Status] | None, default None + version_status : list[clowm.models.ResourceVersion.Status] | None, default None Filter versions of a resource based on the status. Removes resources that have no version after this filter. public : bool | None, default None Filter resources to include only public resources. Returns ------- - workflows : list[clowmdb.models.Resource] + workflows : list[clowm.models.Resource] List of resources. """ with tracer.start_as_current_span("db_list_resources") as span: @@ -145,7 +145,7 @@ class CRUDResource: Returns ------- - resource : clowmdb.models.Resource + resource : clowm.models.Resource The newly created resource """ with tracer.start_as_current_span( diff --git a/clowm/crud/crud_resource_version.py b/clowm/crud/crud_resource_version.py index fe9f01489bc204e1a28fe3bfb85a1aacb45486d1..3721a2f7460c188f7054caaa433e9ba787652f46 100644 --- a/clowm/crud/crud_resource_version.py +++ b/clowm/crud/crud_resource_version.py @@ -30,7 +30,7 @@ class CRUDResourceVersion: Returns ------- - resource_version : clowmdb.models.ResourceVersion | None + resource_version : clowm.models.ResourceVersion | None The resource version with the given ID if it exists, None otherwise """ stmt = select(ResourceVersion).where(ResourceVersion.resource_version_id == resource_version_id) @@ -119,7 +119,7 @@ class CRUDResourceVersion: ID of a resource version. resource_id : uuid.UUID | None, default None ID of a resource. Must be set if `status` is LATEST. - status : clowmdb.models.ResourceVersion.Status + status : clowm.models.ResourceVersion.Status New status of the resource version slurm_job_id : int | None, default None Slurm job id if the update comes from a executed slurm job @@ -187,7 +187,7 @@ class CRUDResourceVersion: Returns ------- - resource_version : clowmdb.models.ResourceVersion + resource_version : clowm.models.ResourceVersion The newly created resource version """ with tracer.start_as_current_span( diff --git a/clowm/crud/crud_workflow_execution.py b/clowm/crud/crud_workflow_execution.py index fb16b46e3f42872ba3d6de68faac717dd6fd77a7..5777dc2f724e0f84f7a4063c15e90887a2998988 100644 --- a/clowm/crud/crud_workflow_execution.py +++ b/clowm/crud/crud_workflow_execution.py @@ -1,3 +1,5 @@ +from datetime import datetime +from math import ceil from typing import Sequence from uuid import UUID @@ -183,14 +185,15 @@ class CRUDWorkflowExecution: await db.commit() @staticmethod - async def set_error( + async def update_status( execution_id: UUID, - status: WorkflowExecution.WorkflowExecutionStatus = WorkflowExecution.WorkflowExecutionStatus.CANCELED, + status: WorkflowExecution.WorkflowExecutionStatus, + end_timestamp: int | float | None = None, *, db: AsyncSession, ) -> None: """ - Update the status of a workflow execution to CANCELED in the database. + Update the status and optional end_time of a workflow execution in the database. Parameters ---------- @@ -198,18 +201,28 @@ class CRUDWorkflowExecution: Async database session to perform query on. execution_id : uuid.UUID ID of the workflow execution - status : clowm.models.WorkflowExecution.WorkflowExecutionStatus, default WorkflowExecutionStatus.CANCELED - Error status the workflow execution should get + status : clowm.models.WorkflowExecution.WorkflowExecutionStatus + New status of the workflow execution + end_timestamp : int | float | None, default None + Optional end timestamp for the workflow. """ - stmt = ( - update(WorkflowExecution) - .where(WorkflowExecution.execution_id == execution_id) - .values(status=status.name, end_time=func.UNIX_TIMESTAMP()) - ) with tracer.start_as_current_span( - "db_cancel_workflow_execution", - attributes={"workflow_execution_id": str(execution_id), "status": status.name, "sql_query": str(stmt)}, - ): + "db_update_workflow_execution_status", attributes={"workflow_execution_id": str(execution_id)} + ) as span: + stmt = ( + update(WorkflowExecution) + .where(WorkflowExecution.execution_id == execution_id) + .values(status=status.name) + ) + if end_timestamp is not None: + span.set_attribute("end_timestamp", round(end_timestamp)) + try: + # Check if end_time is in seconds or milliseconds + datetime.fromtimestamp(end_timestamp) + stmt = stmt.values(end_time=round(end_timestamp)) + except ValueError: + stmt = stmt.values(end_time=round(end_timestamp / 1000)) + span.set_attribute("sql_query", str(stmt)) await db.execute(stmt) await db.commit() @@ -242,3 +255,62 @@ class CRUDWorkflowExecution: ): await db.execute(stmt) await db.commit() + + @staticmethod + async def update_cpu_hours( + execution_id: UUID, + cpu_hours: float, + *, + db: AsyncSession, + ) -> None: + """ + Update the cpu hours for a workflow execution. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + execution_id : str | bytes + ID of the workflow execution + cpu_hours : float + The number of cpus a task is using. + """ + stmt = ( + update(WorkflowExecution) + .where(WorkflowExecution.execution_id == execution_id) + .values(cpu_hours=WorkflowExecution.cpu_hours + cpu_hours) + ) + with tracer.start_as_current_span( + "db_update_cpu_hours", + attributes={"execution_id": str(execution_id), "cpu_hours": cpu_hours, "sql_query": str(stmt)}, + ): + await db.execute(stmt) + await db.commit() + + @staticmethod + async def get_by_id_fragment(execution_id_start: str | bytes, *, db: AsyncSession) -> WorkflowExecution | None: + """ + Get a workflow execution by its execution id from the database. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. + execution_id_start : str | bytes + Start of the ID of the workflow execution + + Returns + ------- + workflow_execution : clowm.models.WorkflowExecution + The workflow execution with the given id if it exists, None otherwise + """ + id_start = ( + int(execution_id_start, 16).to_bytes(ceil(len(execution_id_start) / 2), "big") + if isinstance(execution_id_start, str) + else execution_id_start + ) + stmt = select(WorkflowExecution).where(func.hex(WorkflowExecution.execution_id).startswith(func.hex(id_start))) + with tracer.start_as_current_span( + "db_get_workflow_execution", attributes={"sql_query": str(stmt), "execution_id_start": id_start.hex()} + ): + return await db.scalar(stmt) diff --git a/clowm/db/session.py b/clowm/db/session.py index 0229865e4a9c4d857b8c4ed2d73bd2c13f5a49d0..95cd104999ed936500dc1b534293cd0f248d8d42 100644 --- a/clowm/db/session.py +++ b/clowm/db/session.py @@ -59,7 +59,7 @@ async def get_async_session(url: str, verbose: bool = False) -> AsyncIterator[As async with get_async_session(url=db_url) as db: await db.select(...) """ - engineAsync = create_async_engine(url, echo=verbose, pool_recycle=3600) - async with async_sessionmaker(engineAsync, expire_on_commit=False)() as session: + async_engine = create_async_engine(url, echo=verbose, pool_recycle=3600) + async with async_sessionmaker(async_engine, expire_on_commit=False)() as session: yield session - await engineAsync.dispose() + await async_engine.dispose() diff --git a/clowm/schemas/resource.py b/clowm/schemas/resource.py index 612d16dbcbeba7c1d6b7ff2ceead81febe0107fe..0658f67eb33f174e28f2b97c19c57a297e75142f 100644 --- a/clowm/schemas/resource.py +++ b/clowm/schemas/resource.py @@ -45,9 +45,9 @@ class ResourceOut(BaseResource): Parameters ---------- - db_resource: clowmdb.models.Resource + db_resource: clowm.models.Resource Database model of a resource. - versions : list[clowmdb.models.ResourceVersion] | None, default None + versions : list[clowm.models.ResourceVersion] | None, default None List of versions to attach to the schema. If None, they will be loaded from the DB model. Returns diff --git a/clowm/schemas/tower.py b/clowm/schemas/tower.py new file mode 100644 index 0000000000000000000000000000000000000000..e6290efd32c318ea587cc5e7e8b65e9c3887e91d --- /dev/null +++ b/clowm/schemas/tower.py @@ -0,0 +1,260 @@ +from datetime import datetime +from pathlib import Path as LibPath +from typing import Annotated, Any + +from pydantic import BaseModel, Field +from pydantic.functional_serializers import PlainSerializer + +from .types import URL, UUID + + +class _BaseTraceRequest(BaseModel): + instant: int | float = Field(..., description="Unix timestamp of the request") + + +DateTime = Annotated[datetime, PlainSerializer(lambda date: date.isoformat(), return_type=str, when_used="unless-none")] +Path = Annotated[LibPath, PlainSerializer(lambda path: str(path), return_type=str, when_used="unless-none")] + + +class Process(BaseModel): + index: int + pending: int + ignored: int + loadCpus: int + succeeded: int + running: int + retries: int + peakRunning: int + name: str + loadMemory: int + stored: int + terminated: bool + aborted: int + failed: int + peakCpus: int + peakMemory: int + cached: int + submitted: int + + +class Stats(BaseModel): + succeededCount: int + computeTimeFmt: str + cachedCount: int + processes: list[Process] + changeTimestamp: int + peakRunning: int + succeedDuration: int + cachedPct: float + loadMemory: int + succeedCountFmt: str + failedPct: float + ignoredCount: int + submittedCount: int + peakMemory: int + succeedPct: float + succeedCount: int + runningCount: int + pendingCount: int + loadCpus: int + cachedDuration: int + abortedCount: int + failedDuration: int + failedCount: int + loadMemoryFmt: str + retriesCount: int + cachedCountFmt: str + progressLength: int + peakMemoryFmt: str + failedCountFmt: str + ignoredCountFmt: str + peakCpus: int + ignoredPct: float + + +class Manifest(BaseModel): + doi: str | None = None + nextflowVersion: str | None = None + defaultBranch: str + version: str | None = None + homePage: URL | None = Field(None, examples=["https://example.com"]) + gitmodules: str | None = None + description: str | None = None + recurseSubmodules: bool + name: str | None = None + mainScript: str + author: str | None = None + + +class NextflowVersion(BaseModel): + version: str + build: int + timestamp: DateTime + enable: dict[str, int] + + +class Workflow(BaseModel): + start: DateTime + projectDir: Path + manifest: Manifest + complete: DateTime | None = None + profile: str + homeDir: Path + workDir: Path + container: str | None = None + commitId: str | None = None + errorMessage: str | None = None + repository: URL | None = Field(None, examples=["https://example.com"]) + containerEngine: str | None = None + scriptFile: Path + userName: str + launchDir: Path + runName: str + configFiles: list[Path] + sessionId: str + errorReport: str | None = None + workflowStats: Stats + scriptId: str + revision: str | None = None + exitStatus: int | None = None + commandLine: str + stubRun: bool + nextflow: NextflowVersion + stats: Stats | None = Field(None) + resume: bool + success: bool + projectName: str + scriptName: str + duration: int | None = None + params: Any | None = None + id: str + configText: str + operationId: str | None = None + logFile: str | None = None + outFile: str | None = None + + +class Task(BaseModel): + taskId: int + status: str + hash: str + name: str + exit: int + submit: DateTime | None = None + start: DateTime | None = None + process: str + tag: str | None = None + module: list[str] + container: str | None = None + attempt: int + script: str + scratch: str | bool | None = None + workdir: Path + queue: str | None = None + cpus: int + memory: int | None = None + disk: str | None = None + time: int | None = None + errorAction: str | None = None + complete: DateTime | None = None + env: str | None = None + duration: int | None = Field(None) + realtime: int | None = Field(None) + pcpu: float | None = Field(None) + rchar: int | None = Field(None) + wchar: int | None = Field(None) + syscr: int | None = Field(None) + syscw: int | None = Field(None) + readBytes: int | None = Field(None) + writeBytes: int | None = Field(None) + pmem: float | None = Field(None) + vmem: int | None = Field(None) + rss: int | None = Field(None) + peakVmem: int | None = Field(None) + peakRss: int | None = Field(None) + volCtxt: int | None = Field(None) + invCtxt: int | None = Field(None) + nativeId: int | None = None + executor: str | None = None + cloudZone: str | None = None + machineType: str | None = None + priceModel: str | None = None + + +class Progress(BaseModel): + pending: int + ignored: int + loadCpus: int + loadMemory: int + processes: list[Process] + aborted: int + succeeded: int + peakMemory: int + peakCpus: int + failed: int + running: int + retries: int + peakRunning: int + cached: int + submitted: int + + +class Metric(BaseModel): + mean: float + min: float + q1: float + q2: float + q3: float + max: float + minLabel: str + maxLabel: str + q1Label: str + q2Label: str + q3Label: str + + +class ProcessMetric(BaseModel): + cpuUsage: Metric | None = None + process: str + mem: Metric | None = None + memUsage: Metric | None = None + timeUsage: Metric | None = None + vmem: Metric | None = None + reads: Metric | None = None + cpu: Metric | None = None + time: Metric | None = None + writes: Metric | None = None + + +class CompleteWorkflow(_BaseTraceRequest): + workflow: Workflow + metrics: list[ProcessMetric] + progress: Progress + + +class ReportProgress(_BaseTraceRequest): + tasks: list[Task] | None = None + progress: Progress + + +class BeginWorkflow(_BaseTraceRequest): + workflow: Workflow + processNames: list[str] + towerLaunch: bool + + +class _CreateWorkflowBase(BaseModel): + workflowId: str | None = Field(None, description="ID of the current workflow") + + +class CreateWorkflowIn(_CreateWorkflowBase, _BaseTraceRequest): + sessionId: UUID = Field(..., description="Session ID of the nextflow run") + projectName: str = Field(..., description="Name of the Pipeline nextflow executes") + repository: URL | None = Field( + None, description="URL of the repository the pipeline comes from", examples=["https://example.com"] + ) + runName: str = Field(..., description="Randomly generated name of the run") + + +class CreateWorkflowOut(_CreateWorkflowBase): + pass diff --git a/clowm/smtp/send_email.py b/clowm/smtp/send_email.py index 769f5154d5e50303fe70844ad72bf3cf9a3fe01a..7622c4368f15018d3a37a9fe9de1ed04b37b3289 100644 --- a/clowm/smtp/send_email.py +++ b/clowm/smtp/send_email.py @@ -178,7 +178,7 @@ async def send_sync_request_email( The reviewed workflow version. request_reason : str The reason for the synchronization request - requester : clowmdb.models.User + requester : clowm.models.User The user who requested this synchronization """ async with get_background_db() as db: @@ -200,9 +200,9 @@ async def send_sync_response_email(resource: Resource, version: ResourceVersion, Parameters ---------- - resource : clowmdb.models.Resource + resource : clowm.models.Resource The reviewed workflow. - version : clowmdb.models.ResourceVersion + version : clowm.models.ResourceVersion The reviewed workflow version. deny_reason : str | None If the request was denied, a reason must be given @@ -234,7 +234,7 @@ async def send_sync_success_email(resource: ResourceOut, version: ResourceVersio The reviewed workflow. version : clowm.schemas.resource_version.ResourceVersionOut The reviewed workflow version. - requester : clowmdb.models.User + requester : clowm.models.User The user who requested this synchronization """ html, plain = EmailTemplates.SYNC_SUCCESS.render(resource=resource, version=version, requester=requester) diff --git a/clowm/tests/api/test_resource.py b/clowm/tests/api/test_resource.py index 5945aabca119e412b55e9ed920ca20498eeade31..606e18ff79ed242ec4ccb7e20d1fcff63a5d6e3f 100644 --- a/clowm/tests/api/test_resource.py +++ b/clowm/tests/api/test_resource.py @@ -105,7 +105,7 @@ class TestResourceRouteCreate(_TestResourceRoutes): Random user for testing. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. """ resource = ResourceIn( @@ -137,7 +137,7 @@ class TestResourceRouteDelete(_TestResourceRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. @@ -183,7 +183,7 @@ class TestResourceRouteGet(_TestResourceRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. @@ -205,7 +205,7 @@ class TestResourceRouteGet(_TestResourceRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource_version_states : clowmdb.models.ResourceVersion + random_resource_version_states : clowm.models.ResourceVersion Random resource version with all possible states for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. @@ -235,7 +235,7 @@ class TestResourceRouteGet(_TestResourceRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. diff --git a/clowm/tests/api/test_resource_version.py b/clowm/tests/api/test_resource_version.py index bcb67e7dffc8a0c1891df26363000ddd412bcec3..8464f84da2adb156ea171f31a37e77b94e036c7e 100644 --- a/clowm/tests/api/test_resource_version.py +++ b/clowm/tests/api/test_resource_version.py @@ -54,7 +54,7 @@ class TestResourceVersionRouteCreate(_TestResourceVersionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. cleanup : clowm.tests.utils.utils.CleanupList Cleanup object where (async) functions can be registered which get executed after a (failed) test. @@ -105,9 +105,9 @@ class TestResourceVersionRouteGet(_TestResourceVersionRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. - random_resource_version : clowmdb.models.Resource + random_resource_version : clowm.models.Resource Random resource version for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. @@ -132,9 +132,9 @@ class TestResourceVersionRouteGet(_TestResourceVersionRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. - random_resource_version : clowmdb.models.Resource + random_resource_version : clowm.models.Resource Random resource version for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. @@ -165,7 +165,7 @@ class TestResourceVersionRouteGet(_TestResourceVersionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. """ response = await client.get( @@ -187,7 +187,7 @@ class TestResourceVersionRouteGet(_TestResourceVersionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_resource_version_states : clowmdb.models.ResourceVersion + random_resource_version_states : clowm.models.ResourceVersion Random resource version with all possible states for testing . """ response = await client.get( @@ -230,7 +230,7 @@ class TestResourceVersionRouteGet(_TestResourceVersionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_resource_version : clowmdb.models.ResourceVersion + random_resource_version : clowm.models.ResourceVersion Random resource version for testing. cleanup : clowm.tests.utils.utils.CleanupList Cleanup object where (async) functions can be registered which get executed after a (failed) test. @@ -275,9 +275,9 @@ class TestResourceVersionRouteDelete(_TestResourceVersionRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. - random_resource_version_states : clowmdb.models.Resource + random_resource_version_states : clowm.models.Resource Random resource version with all possible states for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. @@ -320,9 +320,9 @@ class TestResourceVersionRouteDelete(_TestResourceVersionRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. - random_resource_version_states : clowmdb.models.Resource + random_resource_version_states : clowm.models.Resource Random resource version with all possible states for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. @@ -393,9 +393,9 @@ class TestResourceVersionRoutePut(_TestResourceVersionRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. - random_resource_version_states : clowmdb.models.Resource + random_resource_version_states : clowm.models.Resource Random resource version with all possible states for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. @@ -453,9 +453,9 @@ class TestResourceVersionRoutePut(_TestResourceVersionRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. - random_resource_version_states : clowmdb.models.Resource + random_resource_version_states : clowm.models.Resource Random resource version with all possible states for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. @@ -500,9 +500,9 @@ class TestResourceVersionRoutePut(_TestResourceVersionRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. - random_resource_version_states : clowmdb.models.Resource + random_resource_version_states : clowm.models.Resource Random resource version with all possible states for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. @@ -547,9 +547,9 @@ class TestResourceVersionRoutePut(_TestResourceVersionRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. - random_resource_version_states : clowmdb.models.Resource + random_resource_version_states : clowm.models.Resource Random resource version with all possible states for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. @@ -616,9 +616,9 @@ class TestResourceVersionRoutePut(_TestResourceVersionRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. - random_resource_version_states : clowmdb.models.Resource + random_resource_version_states : clowm.models.Resource Random resource version with all possible states for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. @@ -661,9 +661,9 @@ class TestResourceVersionRoutePut(_TestResourceVersionRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. - random_resource_version_states : clowmdb.models.Resource + random_resource_version_states : clowm.models.Resource Random resource version with all possible states for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. @@ -736,9 +736,9 @@ class TestResourceVersionRoutePut(_TestResourceVersionRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. - random_resource_version_states : clowmdb.models.Resource + random_resource_version_states : clowm.models.Resource Random resource version with all possible states for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. @@ -798,9 +798,9 @@ class TestResourceVersionRoutePut(_TestResourceVersionRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. - random_resource_version_states : clowmdb.models.Resource + random_resource_version_states : clowm.models.Resource Random resource version with all possible states for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. @@ -858,9 +858,9 @@ class TestResourceVersionRoutePut(_TestResourceVersionRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. - random_resource_version_states : clowmdb.models.Resource + random_resource_version_states : clowm.models.Resource Random resource version with all possible states for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. @@ -903,9 +903,9 @@ class TestResourceVersionRoutePut(_TestResourceVersionRoutes): ---------- client : httpx.AsyncClient HTTP Client to perform the request on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. - random_resource_version_states : clowmdb.models.Resource + random_resource_version_states : clowm.models.Resource Random resource version with all possible states for testing. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. diff --git a/clowm/tests/api/test_trace.py b/clowm/tests/api/test_trace.py new file mode 100644 index 0000000000000000000000000000000000000000..7eceda74beeaaad64b273cdb31fce97d30b6fa22 --- /dev/null +++ b/clowm/tests/api/test_trace.py @@ -0,0 +1,285 @@ +from datetime import datetime +from uuid import uuid4 + +import pytest +from fastapi import status +from httpx import AsyncClient +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from clowm.core.config import settings +from clowm.models import WorkflowExecution +from clowm.schemas.tower import CreateWorkflowOut +from clowm.tests.utils import random_hex_string, random_lower_string, trace +from clowm.tests.utils.trace import security_header + + +class _TestTraceRoutes: + base_path: str = "/trace" + + +class TestTraceRoutesCreate(_TestTraceRoutes): + @pytest.mark.asyncio + async def test_create_workflow_trace( + self, client: AsyncClient, random_running_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for sending the create workflow event. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. pytest fixture. + random_running_workflow_execution : clowm.models.WorkflowExecution + Random workflow execution for testing. pytest fixture. + """ + body = { + "sessionId": str(uuid4()), + "projectName": random_lower_string(), + "repository": "https://github.com/example/example", + "runName": random_lower_string(), + "instant": round(datetime.now().timestamp()), + "workflowId": None, + } + response = await client.post( + f"{self.base_path}/create", + params={"workspaceId": str(random_running_workflow_execution.execution_id.hex[:16])}, + json=body, + headers=security_header, + ) + assert response.status_code == status.HTTP_200_OK + + r = CreateWorkflowOut.model_validate_json(response.content) + assert r.workflowId == random_running_workflow_execution.execution_id.hex[:16] + + @pytest.mark.asyncio + async def test_create_workflow_trace_with_unknown_execution(self, client: AsyncClient) -> None: + """ + Test for sending the create workflow event where the execution id does not exist. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. pytest fixture. + """ + body = { + "sessionId": str(uuid4()), + "projectName": random_lower_string(), + "repository": "https://github.com/example/example", + "runName": random_lower_string(), + "instant": round(datetime.now().timestamp()), + "workflowId": None, + } + response = await client.post( + f"{self.base_path}/create", + params={"workspaceId": random_hex_string(16)}, + json=body, + headers=security_header, + ) + assert response.status_code == status.HTTP_404_NOT_FOUND + + +class TestTraceRoutesBegin(_TestTraceRoutes): + @pytest.mark.asyncio + async def test_begin_workflow( + self, client: AsyncClient, random_running_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for sending the begin workflow event. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. pytest fixture. + random_running_workflow_execution : clowm.models.WorkflowExecution + Random workflow execution for testing. pytest fixture. + """ + response = await client.put( + f"{self.base_path}/{random_running_workflow_execution.execution_id.hex[:16]}/begin", + content=trace.begin_workflow_event.model_dump_json(), + headers=security_header, + ) + assert response.status_code == status.HTTP_200_OK + + r = response.json() + assert r.get("watchUrl", "").startswith(str(settings.ui_uri)) + + @pytest.mark.asyncio + async def test_begin_workflow_with_unknown_execution(self, client: AsyncClient) -> None: + """ + Test for sending the begin workflow event where the execution id does not exist. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. pytest fixture. + """ + response = await client.put( + f"{self.base_path}/{random_hex_string(16)}/begin", + content=trace.begin_workflow_event.model_dump_json(), + headers=security_header, + ) + assert response.status_code == status.HTTP_404_NOT_FOUND + + +class TestTraceRoutesProgress(_TestTraceRoutes): + @pytest.mark.asyncio + async def test_report_progress_workflow( + self, client: AsyncClient, random_running_workflow_execution: WorkflowExecution, db: AsyncSession + ) -> None: + """ + Test for sending the report progress workflow event. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. pytest fixture. + random_running_workflow_execution : clowm.models.WorkflowExecution + Random workflow execution for testing. pytest fixture. + """ + response = await client.put( + f"{self.base_path}/{random_running_workflow_execution.execution_id.hex[:16]}/progress", + content=trace.progress_event.model_dump_json(), + headers=security_header, + ) + assert response.status_code == status.HTTP_200_OK + assert trace.progress_event.tasks is not None + assert len(trace.progress_event.tasks) > 0 + + task = trace.progress_event.tasks[0] + assert task.realtime is not None + + stmt = select(WorkflowExecution).where( + WorkflowExecution.execution_id == random_running_workflow_execution.execution_id + ) + await db.reset() + db_execution = await db.scalar(stmt) + assert db_execution is not None + assert db_execution.cpu_hours == task.realtime * task.cpus / 60000000 + + +class TestTraceRoutesHeartbeat(_TestTraceRoutes): + @pytest.mark.asyncio + async def test_heartbeat_workflow( + self, client: AsyncClient, random_running_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for sending the heartbeat workflow event. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. pytest fixture. + random_running_workflow_execution : clowm.models.WorkflowExecution + Random workflow execution for testing. pytest fixture. + """ + request_body = trace.progress_event.model_dump().copy() + del request_body["tasks"] + response = await client.put( + f"{self.base_path}/{random_running_workflow_execution.execution_id.hex[:16]}/heartbeat", + json=request_body, + headers=security_header, + ) + assert response.status_code == status.HTTP_200_OK + + +class TestTraceRoutesComplete(_TestTraceRoutes): + @pytest.mark.asyncio + async def test_complete_workflow_with_success( + self, client: AsyncClient, db: AsyncSession, random_running_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for sending the complete workflow event with success. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. pytest fixture. + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. pytest fixture. + random_running_workflow_execution : clowm.models.WorkflowExecution + Random workflow execution for testing. pytest fixture. + """ + response = await client.put( + f"{self.base_path}/{random_running_workflow_execution.execution_id.hex[:16]}/complete", + content=trace.complete_workflow_event.model_dump_json(), + headers=security_header, + ) + assert response.status_code == status.HTTP_200_OK + + stmt = select(WorkflowExecution).where( + WorkflowExecution.execution_id == random_running_workflow_execution.execution_id + ) + execution = await db.scalar(stmt) + assert execution is not None + assert execution == random_running_workflow_execution + assert execution.status == WorkflowExecution.WorkflowExecutionStatus.SUCCESS + + @pytest.mark.asyncio + async def test_complete_workflow_with_error( + self, client: AsyncClient, db: AsyncSession, random_running_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for sending the complete workflow event with an error. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. pytest fixture. + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. pytest fixture. + random_running_workflow_execution : clowm.models.WorkflowExecution + Random workflow execution for testing. pytest fixture. + """ + body = trace.complete_workflow_event.model_dump().copy() + body["workflow"]["success"] = False # type: ignore[index] + response = await client.put( + f"{self.base_path}/{random_running_workflow_execution.execution_id.hex[:16]}/complete", + json=body, + headers=security_header, + ) + assert response.status_code == status.HTTP_200_OK + stmt = select(WorkflowExecution).where( + WorkflowExecution.execution_id == random_running_workflow_execution.execution_id + ) + execution = await db.scalar(stmt) + + assert execution is not None + assert execution == random_running_workflow_execution + assert execution.status == WorkflowExecution.WorkflowExecutionStatus.ERROR + assert execution.end_time is not None + + @pytest.mark.asyncio + async def test_complete_workflow_with_cancel( + self, client: AsyncClient, db: AsyncSession, random_running_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for sending the complete workflow event with canceled workflow run. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. pytest fixture. + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. pytest fixture. + random_running_workflow_execution : clowm.models.WorkflowExecution + Random workflow execution for testing. pytest fixture. + """ + body = trace.complete_workflow_event.model_dump().copy() + body["workflow"]["success"] = False # type: ignore[index] + body["workflow"]["errorReport"] = "SIGTERM" # type: ignore[index] + response = await client.put( + f"{self.base_path}/{random_running_workflow_execution.execution_id.hex[:16]}/complete", + json=body, + headers=security_header, + ) + assert response.status_code == status.HTTP_200_OK + stmt = select(WorkflowExecution).where( + WorkflowExecution.execution_id == random_running_workflow_execution.execution_id + ) + execution = await db.scalar(stmt) + + assert execution is not None + assert execution == random_running_workflow_execution + assert execution.status == WorkflowExecution.WorkflowExecutionStatus.CANCELED + assert execution.end_time is not None diff --git a/clowm/tests/api/test_workflow.py b/clowm/tests/api/test_workflow.py index fa21b3b0beafd4490c8c42dbdb07314246da2479..be758f4045d755a645aef8a8e229e67f0637ce50 100644 --- a/clowm/tests/api/test_workflow.py +++ b/clowm/tests/api/test_workflow.py @@ -558,7 +558,7 @@ class TestWorkflowRoutesList(_TestWorkflowRoutes): Random user for testing. random_workflow : clowm.schemas.workflow.WorkflowOut Random workflow for testing. - random_completed_workflow_execution : clowmdb.models.WorkflowExecution + random_completed_workflow_execution : clowm.models.WorkflowExecution Random workflow execution for testing """ response = await client.get( @@ -640,7 +640,7 @@ class TestWorkflowRoutesGet(_TestWorkflowRoutes): Random user for testing. random_workflow : clowm.schemas.workflow.WorkflowOut Random workflow for testing. - random_running_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowm.models.WorkflowExecution Random workflow execution for testing """ response = await client.get( @@ -765,7 +765,7 @@ class TestWorkflowRoutesDelete(_TestWorkflowRoutes): Random workflow for testing. mock_s3_service : types_aiobotocore_s3.service_resource.S3ServiceResource Mock S3 Service to manipulate objects. - random_workflow_mode : clowmdb.model.WorkflowMode + random_workflow_mode : clowm.model.WorkflowMode Random workflow mode for testing db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. @@ -858,7 +858,7 @@ class TestWorkflowRoutesUpdate(_TestWorkflowRoutes): Random user for testing. random_workflow : clowm.schemas.workflow.WorkflowOut Random workflow for testing. - random_workflow_mode : clowmdb.model.WorkflowMode + random_workflow_mode : clowm.model.WorkflowMode Random workflow mode for testing mock_s3_service : types_aiobotocore_s3.service_resource.S3ServiceResource Mock S3 Service to manipulate objects. @@ -965,7 +965,7 @@ class TestWorkflowRoutesUpdate(_TestWorkflowRoutes): Random user for testing. random_workflow : clowm.schemas.workflow.WorkflowOut Random workflow for testing. - random_workflow_mode : clowmdb.model.WorkflowMode + random_workflow_mode : clowm.model.WorkflowMode Random workflow mode for testing mock_s3_service : types_aiobotocore_s3.service_resource.S3ServiceResource Mock S3 Service to manipulate objects. @@ -1024,7 +1024,7 @@ class TestWorkflowRoutesUpdate(_TestWorkflowRoutes): Random user for testing. random_workflow : clowm.schemas.workflow.WorkflowOut Random workflow for testing. - random_workflow_mode : clowmdb.model.WorkflowMode + random_workflow_mode : clowm.model.WorkflowMode Random workflow mode for testing mock_s3_service : types_aiobotocore_s3.service_resource.S3ServiceResource Mock S3 Service to manipulate objects. diff --git a/clowm/tests/api/test_workflow_execution.py b/clowm/tests/api/test_workflow_execution.py index 0fc86e058349c1ca626652548273ce19d8ff6820..7618fc9901f488618554702f6c97041037663e73 100644 --- a/clowm/tests/api/test_workflow_execution.py +++ b/clowm/tests/api/test_workflow_execution.py @@ -50,7 +50,7 @@ class TestWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes): Random user for testing. random_workflow : clowm.schemas.workflow.WorkflowOut Random workflow for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. mock_s3_service: types_aiobotocore_s3.service_resource.S3ServiceResource Mock S3 Service to manipulate objects. @@ -110,7 +110,7 @@ class TestWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. random_private_workflow : clowm.schemas.workflow.WorkflowOut Random private workflow for testing. @@ -172,7 +172,7 @@ class TestWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes): Random user for testing. random_workflow : clowm.schemas.workflow.WorkflowOut Random workflow for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. mock_s3_service : types_aiobotocore_s3.service_resource.S3ServiceResource Mock S3 Service to manipulate objects. @@ -250,7 +250,7 @@ class TestWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes): Async database session to perform query on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. """ active_execution_counter = 0 @@ -311,7 +311,7 @@ class TestWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes): Async database session to perform query on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. """ await db.execute( @@ -347,7 +347,7 @@ class TestWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. mock_slurm_cluster : clowm.tests.mocks.mock_slurm_cluster.MockSlurmCluster Mock Slurm cluster to inspect submitted jobs. @@ -389,7 +389,7 @@ class TestWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. mock_slurm_cluster : clowm.tests.mocks.mock_slurm_cluster.MockSlurmCluster Mock Slurm cluster to inspect submitted jobs. @@ -420,7 +420,7 @@ class TestWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. mock_slurm_cluster : clowm.tests.mocks.mock_slurm_cluster.MockSlurmCluster Mock Slurm cluster to inspect submitted jobs. @@ -526,7 +526,7 @@ class TestWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes): Random user for testing. random_workflow : clowm.schemas.workflow.WorkflowOut Random workflow for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. mock_slurm_cluster : clowm.tests.mocks.mock_slurm_cluster.MockSlurmCluster Mock Slurm cluster to inspect submitted jobs. @@ -574,7 +574,7 @@ class TestWorkflowExecutionRoutesCreate(_TestWorkflowExecutionRoutes): Random user for testing. random_workflow : clowm.schemas.workflow.WorkflowOut Random workflow for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. mock_s3_service : types_aiobotocore_s3.service_resource.S3ServiceResource Mock S3 Service to manipulate objects. @@ -983,7 +983,7 @@ class TestWorkflowExecutionRoutesGet(_TestWorkflowExecutionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_running_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowm.models.WorkflowExecution Random workflow execution for testing. """ response = await client.get( @@ -1011,7 +1011,7 @@ class TestWorkflowExecutionRoutesGet(_TestWorkflowExecutionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_running_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowm.models.WorkflowExecution Random workflow execution for testing. """ response = await client.get( @@ -1060,7 +1060,7 @@ class TestWorkflowExecutionRoutesList(_TestWorkflowExecutionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_running_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowm.models.WorkflowExecution Random workflow execution for testing. """ response = await client.get(self.base_path, headers=random_user.auth_headers) @@ -1095,7 +1095,7 @@ class TestWorkflowExecutionRoutesDelete(_TestWorkflowExecutionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_running_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowm.models.WorkflowExecution Random workflow execution for testing. """ response = await client.delete( @@ -1123,7 +1123,7 @@ class TestWorkflowExecutionRoutesDelete(_TestWorkflowExecutionRoutes): Async database session to perform query on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_running_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowm.models.WorkflowExecution Random workflow execution for testing. """ await db.execute( @@ -1157,7 +1157,7 @@ class TestWorkflowExecutionRoutesCancel(_TestWorkflowExecutionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_running_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowm.models.WorkflowExecution Random workflow execution for testing. mock_slurm_cluster : clowm.tests.mocks.mock_slurm_cluster.MockSlurmCluster Mock Slurm cluster to inspect submitted jobs. @@ -1189,7 +1189,7 @@ class TestWorkflowExecutionRoutesCancel(_TestWorkflowExecutionRoutes): Async database session to perform query on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_running_workflow_execution : clowmdb.models.WorkflowExecution + random_running_workflow_execution : clowm.models.WorkflowExecution Random workflow execution for testing. """ await db.execute( diff --git a/clowm/tests/api/test_workflow_version.py b/clowm/tests/api/test_workflow_version.py index 0c0b1995776507a2a19dc197ef0b73dcf7ec838a..4561e96c475cc26ff8d91f072a095833223d3a94 100644 --- a/clowm/tests/api/test_workflow_version.py +++ b/clowm/tests/api/test_workflow_version.py @@ -319,7 +319,7 @@ class TestWorkflowVersionRoutesGetDocumentation(_TestWorkflowVersionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. document : clowm.api.endpoints.workflow_version.DocumentationEnum All possible documents as pytest parameter. @@ -357,7 +357,7 @@ class TestWorkflowVersionRoutesGetDocumentation(_TestWorkflowVersionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. document : clowm.api.endpoints.workflow_version.DocumentationEnum All possible documents as pytest parameter. @@ -396,9 +396,9 @@ class TestWorkflowVersionRoutesGetDocumentation(_TestWorkflowVersionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. - random_workflow_mode : clowmdb.models.WorkflowMode + random_workflow_mode : clowm.models.WorkflowMode Random workflow mode for testing. document : clowm.api.endpoints.workflow_version.DocumentationEnum All possible documents as pytest parameter. @@ -439,7 +439,7 @@ class TestWorkflowVersionIconRoutes(_TestWorkflowVersionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. mock_s3_service : types_aiobotocore_s3.service_resource.S3ServiceResource Mock S3 Service to manipulate objects. @@ -498,7 +498,7 @@ class TestWorkflowVersionIconRoutes(_TestWorkflowVersionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. mock_s3_service : types_aiobotocore_s3.service_resource.S3ServiceResource Mock S3 Service to manipulate objects. @@ -538,7 +538,7 @@ class TestWorkflowVersionIconRoutes(_TestWorkflowVersionRoutes): HTTP Client to perform the request on. random_second_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. """ files = {"icon": ("RickRoll.txt", BytesIO(b"Never gonna give you up"), "plain/text")} @@ -575,7 +575,7 @@ class TestWorkflowVersionIconRoutes(_TestWorkflowVersionRoutes): HTTP Client to perform the request on. random_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. mock_s3_service : types_aiobotocore_s3.service_resource.S3ServiceResource Mock S3 Service to manipulate objects. @@ -626,7 +626,7 @@ class TestWorkflowVersionIconRoutes(_TestWorkflowVersionRoutes): HTTP Client to perform the request on. random_second_user : clowm.tests.utils.user.UserWithAuthHeader Random user for testing. - random_workflow_version : clowmdb.models.WorkflowVersion + random_workflow_version : clowm.models.WorkflowVersion Random workflow version for testing. mock_s3_service : types_aiobotocore_s3.service_resource.S3ServiceResource Mock S3 Service to manipulate objects. diff --git a/clowm/tests/conftest.py b/clowm/tests/conftest.py index 4acf1631be0179c1665e7da646489d1a658306d7..25c38bb6db3fd9c138b915de94adca0590eff86c 100644 --- a/clowm/tests/conftest.py +++ b/clowm/tests/conftest.py @@ -432,8 +432,8 @@ async def random_running_workflow_execution( update(WorkflowExecution) .where(WorkflowExecution.execution_id == execution.execution_id) .values(slurm_job_id=slurm_job_id) - # .values(slurm_job_id=slurm_job_id) ) + await db.commit() parameter_obj = await mock_s3_service.Object( bucket_name=settings.s3.data_bucket, key=settings.s3.execution_parameters_key(execution.execution_id) ) diff --git a/clowm/tests/crud/test_resource.py b/clowm/tests/crud/test_resource.py index 3e0fd134249a91d7d5b5c8c7f44c1d214497ba33..1b762328227acb976b6c4364bf83aada033febea 100644 --- a/clowm/tests/crud/test_resource.py +++ b/clowm/tests/crud/test_resource.py @@ -22,7 +22,7 @@ class TestResourceCRUDGet: ---------- db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. """ resource = await CRUDResource.get(db, resource_id=random_resource.resource_id) @@ -51,7 +51,7 @@ class TestResourceCRUDGet: ---------- db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. - random_resource : clowmdb.models.Resource + random_resource : clowm.models.Resource Random resource for testing. """ resource = await CRUDResource.get_by_name(db, name=random_resource.name) diff --git a/clowm/tests/crud/test_workflow_execution.py b/clowm/tests/crud/test_workflow_execution.py index c5e905e2ff61dac615dc3829a9e327376c686a5b..8d04e07074bddce71c46790d8f5fbf5992beb8e9 100644 --- a/clowm/tests/crud/test_workflow_execution.py +++ b/clowm/tests/crud/test_workflow_execution.py @@ -1,3 +1,4 @@ +import time from uuid import uuid4 import pytest @@ -108,6 +109,39 @@ class TestWorkflowExecutionCRUDGet: execution = await CRUDWorkflowExecution.get(uuid4(), db=db) assert execution is None + @pytest.mark.asyncio + async def test_get_workflow_execution_by_id_fragment( + self, db: AsyncSession, random_running_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for getting a workflow execution by its execution id. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. pytest fixture. + random_running_workflow_execution : clowm.models.WorkflowExecution + Random workflow execution for testing. pytest fixture. + """ + execution = await CRUDWorkflowExecution.get_by_id_fragment( + db=db, execution_id_start=random_running_workflow_execution.execution_id.hex[:16] + ) + assert execution is not None + assert execution == random_running_workflow_execution + + @pytest.mark.asyncio + async def test_get_non_existing_workflow_execution_by_id_fragment(self, db: AsyncSession) -> None: + """ + Test for getting a non-existing workflow execution. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. pytest fixture. + """ + execution = await CRUDWorkflowExecution.get_by_id_fragment(db=db, execution_id_start=random_hex_string(16)) + assert execution is None + class TestWorkflowExecutionCRUDList: @pytest.mark.asyncio @@ -234,13 +268,13 @@ class TestWorkflowExecutionCRUDList: assert sum(1 for execution in executions if execution.status == random_running_workflow_execution.status) >= 1 -class TestWorkflowExecutionCRUDLUpdate: +class TestWorkflowExecutionCRUDUpdate: @pytest.mark.asyncio - async def test_cancel_workflow_execution( + async def test_update_workflow_execution_slurm_job( self, db: AsyncSession, random_running_workflow_execution: WorkflowExecution ) -> None: """ - Test for canceling a workflow execution. + Test for updating the slurm job id of a workflow execution. Parameters ---------- @@ -249,37 +283,101 @@ class TestWorkflowExecutionCRUDLUpdate: random_running_workflow_execution : clowm.models.WorkflowExecution Random workflow execution for testing. """ - await CRUDWorkflowExecution.set_error(random_running_workflow_execution.execution_id, db=db) + await CRUDWorkflowExecution.update_slurm_job_id(random_running_workflow_execution.execution_id, 250, db=db) stmt = select(WorkflowExecution).where( WorkflowExecution.execution_id == random_running_workflow_execution.execution_id ) execution = await db.scalar(stmt) assert execution is not None - assert execution.status == WorkflowExecution.WorkflowExecutionStatus.CANCELED + assert execution.slurm_job_id == 250 @pytest.mark.asyncio - async def test_update_workflow_execution_slurm_job( + async def test_update_workflow_status( self, db: AsyncSession, random_running_workflow_execution: WorkflowExecution ) -> None: """ - Test for updating the slurm job id of a workflow execution. + Test for updating the workflow execution status. Parameters ---------- db : sqlalchemy.ext.asyncio.AsyncSession. - Async database session to perform query on. + Async database session to perform query on. pytest fixture. random_running_workflow_execution : clowm.models.WorkflowExecution - Random workflow execution for testing. + Random workflow execution for testing. pytest fixture. """ - await CRUDWorkflowExecution.update_slurm_job_id(random_running_workflow_execution.execution_id, 250, db=db) + await CRUDWorkflowExecution.update_status( + db=db, + execution_id=random_running_workflow_execution.execution_id, + status=WorkflowExecution.WorkflowExecutionStatus.RUNNING, + ) stmt = select(WorkflowExecution).where( WorkflowExecution.execution_id == random_running_workflow_execution.execution_id ) execution = await db.scalar(stmt) + assert execution is not None - assert execution.slurm_job_id == 250 + assert execution == random_running_workflow_execution + assert execution.status == WorkflowExecution.WorkflowExecutionStatus.RUNNING + + @pytest.mark.asyncio + async def test_update_workflow_status_with_timestamp( + self, db: AsyncSession, random_running_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for updating the workflow execution status with and end timestamp. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. pytest fixture. + random_running_workflow_execution : clowm.models.WorkflowExecution + Random workflow execution for testing. pytest fixture. + """ + await CRUDWorkflowExecution.update_status( + db=db, + execution_id=random_running_workflow_execution.execution_id, + status=WorkflowExecution.WorkflowExecutionStatus.ERROR, + end_timestamp=round(time.time()), + ) + + stmt = select(WorkflowExecution).where( + WorkflowExecution.execution_id == random_running_workflow_execution.execution_id + ) + execution = await db.scalar(stmt) + + assert execution is not None + assert execution == random_running_workflow_execution + assert execution.status == WorkflowExecution.WorkflowExecutionStatus.ERROR + assert execution.end_time is not None + + @pytest.mark.asyncio + async def test_update_execution_cpu_hours( + self, db: AsyncSession, random_running_workflow_execution: WorkflowExecution + ) -> None: + """ + Test for updating the workflow execution cpu hours. + + Parameters + ---------- + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. pytest fixture. + random_workflow_execution : clowm.models.WorkflowExecution + Random workflow execution for testing. pytest fixture. + """ + await CRUDWorkflowExecution.update_cpu_hours( + db=db, execution_id=random_running_workflow_execution.execution_id, cpu_hours=6.5 + ) + + stmt = select(WorkflowExecution).where( + WorkflowExecution.execution_id == random_running_workflow_execution.execution_id + ) + execution = await db.scalar(stmt) + + assert execution is not None + assert execution == random_running_workflow_execution + assert execution.cpu_hours == 6.5 class TestWorkflowExecutionCRUDDelete: diff --git a/clowm/tests/utils/trace.py b/clowm/tests/utils/trace.py new file mode 100644 index 0000000000000000000000000000000000000000..c265c19e5f1e6aedf47804f853cb304788e31bd7 --- /dev/null +++ b/clowm/tests/utils/trace.py @@ -0,0 +1,570 @@ +from base64 import b64encode +from datetime import datetime + +from clowm.core.config import settings +from clowm.schemas.tower import BeginWorkflow, CompleteWorkflow, ReportProgress + +_digest = b64encode(f"@token:{settings.cluster.tower_secret.get_secret_value()}".encode("utf-8")).decode("utf-8") +security_header = {"Authorization": f"Basic {_digest}"} + +begin_workflow_event = BeginWorkflow.model_validate( + { + "instant": datetime.now().timestamp(), + "workflow": { + "start": "2023-02-28T08:41:08.402Z", + "projectDir": "string", + "manifest": { + "doi": "string", + "nextflowVersion": "string", + "defaultBranch": "string", + "version": "string", + "homePage": "https://example.com", + "gitmodules": "string", + "description": "string", + "recurseSubmodules": False, + "name": "string", + "mainScript": "string", + "author": "string", + }, + "complete": "2023-02-28T08:41:08.402Z", + "profile": "string", + "homeDir": "string", + "workDir": "string", + "container": "string", + "commitId": "string", + "errorMessage": "string", + "repository": "https://example.com", + "containerEngine": "string", + "scriptFile": "string", + "userName": "string", + "launchDir": "string", + "runName": "string", + "configFiles": ["string"], + "sessionId": "string", + "errorReport": "string", + "workflowStats": { + "succeededCount": 0, + "computeTimeFmt": "string", + "cachedCount": 0, + "processes": [ + { + "index": 0, + "pending": 0, + "ignored": 0, + "loadCpus": 0, + "succeeded": 0, + "running": 0, + "retries": 0, + "peakRunning": 0, + "name": "string", + "loadMemory": 0, + "stored": 0, + "terminated": False, + "aborted": 0, + "failed": 0, + "peakCpus": 0, + "peakMemory": 0, + "cached": 0, + "submitted": 0, + } + ], + "changeTimestamp": 0, + "peakRunning": 0, + "succeedDuration": 0, + "cachedPct": 0, + "loadMemory": 0, + "succeedCountFmt": "string", + "failedPct": 0, + "ignoredCount": 0, + "submittedCount": 0, + "peakMemory": 0, + "succeedPct": 0, + "succeedCount": 0, + "runningCount": 0, + "pendingCount": 0, + "loadCpus": 0, + "cachedDuration": 0, + "abortedCount": 0, + "failedDuration": 0, + "failedCount": 0, + "loadMemoryFmt": "string", + "retriesCount": 0, + "cachedCountFmt": "string", + "progressLength": 0, + "peakMemoryFmt": "string", + "failedCountFmt": "string", + "ignoredCountFmt": "string", + "peakCpus": 0, + "ignoredPct": 0, + }, + "scriptId": "string", + "revision": "string", + "exitStatus": 0, + "commandLine": "string", + "stubRun": False, + "nextflow": { + "version": "string", + "build": 0, + "timestamp": "2023-02-28T08:41:08.402Z", + "enable": {"additionalProp1": 0, "additionalProp2": 0, "additionalProp3": 0}, + }, + "stats": { + "succeededCount": 0, + "computeTimeFmt": "string", + "cachedCount": 0, + "processes": [ + { + "index": 0, + "pending": 0, + "ignored": 0, + "loadCpus": 0, + "succeeded": 0, + "running": 0, + "retries": 0, + "peakRunning": 0, + "name": "string", + "loadMemory": 0, + "stored": 0, + "terminated": False, + "aborted": 0, + "failed": 0, + "peakCpus": 0, + "peakMemory": 0, + "cached": 0, + "submitted": 0, + } + ], + "changeTimestamp": 0, + "peakRunning": 0, + "succeedDuration": 0, + "cachedPct": 0, + "loadMemory": 0, + "succeedCountFmt": "string", + "failedPct": 0, + "ignoredCount": 0, + "submittedCount": 0, + "peakMemory": 0, + "succeedPct": 0, + "succeedCount": 0, + "runningCount": 0, + "pendingCount": 0, + "loadCpus": 0, + "cachedDuration": 0, + "abortedCount": 0, + "failedDuration": 0, + "failedCount": 0, + "loadMemoryFmt": "string", + "retriesCount": 0, + "cachedCountFmt": "string", + "progressLength": 0, + "peakMemoryFmt": "string", + "failedCountFmt": "string", + "ignoredCountFmt": "string", + "peakCpus": 0, + "ignoredPct": 0, + }, + "resume": False, + "success": False, + "projectName": "string", + "scriptName": "string", + "duration": 0, + "params": { + "additionalProp1": "string", + }, + "id": "string", + "configText": "string", + "operationId": "string", + "logFile": "string", + "outFile": "string", + }, + "processNames": ["string"], + "towerLaunch": False, + } +) + +progress_event = ReportProgress.model_validate( + { + "instant": datetime.now().timestamp(), + "tasks": [ + { + "taskId": 0, + "status": "COMPLETED", + "hash": "string", + "name": "string", + "exit": 0, + "submit": "2023-02-28T08:46:32.691Z", + "start": "2023-02-28T08:46:32.691Z", + "process": "string", + "tag": "string", + "module": ["string"], + "container": "string", + "attempt": 0, + "script": "string", + "scratch": "string", + "workdir": "string", + "queue": "string", + "cpus": 3, + "memory": 0, + "disk": "string", + "time": 0, + "errorAction": "string", + "complete": "2023-02-28T08:46:32.691Z", + "env": "string", + "duration": 0, + "realtime": 600000, + "pcpu": 0, + "rchar": 0, + "wchar": 0, + "syscr": 0, + "syscw": 0, + "readBytes": 0, + "writeBytes": 0, + "pmem": 0, + "vmem": 0, + "rss": 0, + "peakVmem": 0, + "peakRss": 0, + "volCtxt": 0, + "invCtxt": 0, + "nativeId": 0, + "executor": "string", + "cloudZone": "string", + "machineType": "string", + "priceModel": "string", + } + ], + "progress": { + "pending": 0, + "ignored": 0, + "loadCpus": 0, + "loadMemory": 0, + "processes": [ + { + "index": 0, + "pending": 0, + "ignored": 0, + "loadCpus": 0, + "succeeded": 0, + "running": 0, + "retries": 0, + "peakRunning": 0, + "name": "string", + "loadMemory": 0, + "stored": 0, + "terminated": False, + "aborted": 0, + "failed": 0, + "peakCpus": 0, + "peakMemory": 0, + "cached": 0, + "submitted": 0, + } + ], + "aborted": 0, + "succeeded": 0, + "peakMemory": 0, + "peakCpus": 0, + "failed": 0, + "running": 0, + "retries": 0, + "peakRunning": 0, + "cached": 0, + "submitted": 0, + }, + } +) + +complete_workflow_event = CompleteWorkflow.model_validate( + { + "instant": datetime.now().timestamp(), + "workflow": { + "start": "2023-02-28T08:47:12.191Z", + "projectDir": "string", + "manifest": { + "doi": "string", + "nextflowVersion": "string", + "defaultBranch": "string", + "version": "string", + "homePage": "https://example.com", + "gitmodules": "string", + "description": "string", + "recurseSubmodules": False, + "name": "string", + "mainScript": "string", + "author": "string", + }, + "complete": "2023-02-28T08:47:12.191Z", + "profile": "string", + "homeDir": "string", + "workDir": "string", + "container": "string", + "commitId": "string", + "errorMessage": "string", + "repository": "https://example.com", + "containerEngine": "string", + "scriptFile": "string", + "userName": "string", + "launchDir": "string", + "runName": "string", + "configFiles": ["string"], + "sessionId": "string", + "errorReport": "string", + "workflowStats": { + "succeededCount": 0, + "computeTimeFmt": "string", + "cachedCount": 0, + "processes": [ + { + "index": 0, + "pending": 0, + "ignored": 0, + "loadCpus": 0, + "succeeded": 0, + "running": 0, + "retries": 0, + "peakRunning": 0, + "name": "string", + "loadMemory": 0, + "stored": 0, + "terminated": False, + "aborted": 0, + "failed": 0, + "peakCpus": 0, + "peakMemory": 0, + "cached": 0, + "submitted": 0, + } + ], + "changeTimestamp": 0, + "peakRunning": 0, + "succeedDuration": 0, + "cachedPct": 0, + "loadMemory": 0, + "succeedCountFmt": "string", + "failedPct": 0, + "ignoredCount": 0, + "submittedCount": 0, + "peakMemory": 0, + "succeedPct": 0, + "succeedCount": 0, + "runningCount": 0, + "pendingCount": 0, + "loadCpus": 0, + "cachedDuration": 0, + "abortedCount": 0, + "failedDuration": 0, + "failedCount": 0, + "loadMemoryFmt": "string", + "retriesCount": 0, + "cachedCountFmt": "string", + "progressLength": 0, + "peakMemoryFmt": "string", + "failedCountFmt": "string", + "ignoredCountFmt": "string", + "peakCpus": 0, + "ignoredPct": 0, + }, + "scriptId": "string", + "revision": "string", + "exitStatus": 0, + "commandLine": "string", + "stubRun": False, + "nextflow": { + "version": "string", + "build": 0, + "timestamp": "2023-02-28T08:47:12.192Z", + "enable": {"additionalProp1": 0, "additionalProp2": 0, "additionalProp3": 0}, + }, + "stats": { + "succeededCount": 0, + "computeTimeFmt": "string", + "cachedCount": 0, + "processes": [ + { + "index": 0, + "pending": 0, + "ignored": 0, + "loadCpus": 0, + "succeeded": 0, + "running": 0, + "retries": 0, + "peakRunning": 0, + "name": "string", + "loadMemory": 0, + "stored": 0, + "terminated": False, + "aborted": 0, + "failed": 0, + "peakCpus": 0, + "peakMemory": 0, + "cached": 0, + "submitted": 0, + } + ], + "changeTimestamp": 0, + "peakRunning": 0, + "succeedDuration": 0, + "cachedPct": 0, + "loadMemory": 0, + "succeedCountFmt": "string", + "failedPct": 0, + "ignoredCount": 0, + "submittedCount": 0, + "peakMemory": 0, + "succeedPct": 0, + "succeedCount": 0, + "runningCount": 0, + "pendingCount": 0, + "loadCpus": 0, + "cachedDuration": 0, + "abortedCount": 0, + "failedDuration": 0, + "failedCount": 0, + "loadMemoryFmt": "string", + "retriesCount": 0, + "cachedCountFmt": "string", + "progressLength": 0, + "peakMemoryFmt": "string", + "failedCountFmt": "string", + "ignoredCountFmt": "string", + "peakCpus": 0, + "ignoredPct": 0, + }, + "resume": False, + "success": True, + "projectName": "string", + "scriptName": "string", + "duration": 0, + "params": {"additionalProp1": "string", "additionalProp2": "string", "additionalProp3": "string"}, + "id": "string", + "configText": "string", + "operationId": "string", + "logFile": "string", + "outFile": "string", + }, + "metrics": [ + { + "cpuUsage": { + "mean": 0, + "min": 0, + "q1": 0, + "q2": 0, + "q3": 0, + "max": 0, + "minLabel": "string", + "maxLabel": "string", + "q1Label": "string", + "q2Label": "string", + "q3Label": "string", + }, + "process": "string", + "mem": None, + "memUsage": None, + "timeUsage": None, + "vmem": { + "mean": 0, + "min": 0, + "q1": 0, + "q2": 0, + "q3": 0, + "max": 0, + "minLabel": "string", + "maxLabel": "string", + "q1Label": "string", + "q2Label": "string", + "q3Label": "string", + }, + "reads": { + "mean": 0, + "min": 0, + "q1": 0, + "q2": 0, + "q3": 0, + "max": 0, + "minLabel": "string", + "maxLabel": "string", + "q1Label": "string", + "q2Label": "string", + "q3Label": "string", + }, + "cpu": { + "mean": 0, + "min": 0, + "q1": 0, + "q2": 0, + "q3": 0, + "max": 0, + "minLabel": "string", + "maxLabel": "string", + "q1Label": "string", + "q2Label": "string", + "q3Label": "string", + }, + "time": { + "mean": 0, + "min": 0, + "q1": 0, + "q2": 0, + "q3": 0, + "max": 0, + "minLabel": "string", + "maxLabel": "string", + "q1Label": "string", + "q2Label": "string", + "q3Label": "string", + }, + "writes": { + "mean": 0, + "min": 0, + "q1": 0, + "q2": 0, + "q3": 0, + "max": 0, + "minLabel": "string", + "maxLabel": "string", + "q1Label": "string", + "q2Label": "string", + "q3Label": "string", + }, + } + ], + "progress": { + "pending": 0, + "ignored": 0, + "loadCpus": 0, + "loadMemory": 0, + "processes": [ + { + "index": 0, + "pending": 0, + "ignored": 0, + "loadCpus": 0, + "succeeded": 0, + "running": 0, + "retries": 0, + "peakRunning": 0, + "name": "string", + "loadMemory": 0, + "stored": 0, + "terminated": False, + "aborted": 0, + "failed": 0, + "peakCpus": 0, + "peakMemory": 0, + "cached": 0, + "submitted": 0, + } + ], + "aborted": 0, + "succeeded": 0, + "peakMemory": 0, + "peakCpus": 0, + "failed": 0, + "running": 0, + "retries": 0, + "peakRunning": 0, + "cached": 0, + "submitted": 0, + }, + } +) diff --git a/example-config/example-config.json b/example-config/example-config.json index 8fc1f26ed25a5f0e58d57583c6cd526beb191bd9..e02e516de48bc52a492f3ce5bed7a2407859042a 100644 --- a/example-config/example-config.json +++ b/example-config/example-config.json @@ -51,7 +51,8 @@ "job_monitoring": "EXPONENTIAL", "execution_cleanup": true, "resource_cluster_path": "/vol/data/databases", - "resource_container_path": "/vol/resources" + "resource_container_path": "/vol/resources", + "tower_secret": "xxx" }, "api_prefix": "/api", "private_key_file": "/path/to/key", diff --git a/example-config/example-config.toml b/example-config/example-config.toml index d1ea534367c171e3867b17b5fe710d6f658048d0..06ef5e96b20faa503cff4541b8d4fcef120bb78a 100644 --- a/example-config/example-config.toml +++ b/example-config/example-config.toml @@ -42,6 +42,7 @@ password = "xxx" execution_cleanup = true resource_cluster_path = "/vol/data/databases" resource_container_path = "/vol/resources" + tower_secret = "xxx" [cluster.slurm] uri = "http://localhost" token = "xxx" diff --git a/example-config/example-config.yaml b/example-config/example-config.yaml index fc919a59d2d5e1a578996a47ca011cb31ae221cd..540c40f5284b586d9bdf02a4716b348ae6d10bb0 100644 --- a/example-config/example-config.yaml +++ b/example-config/example-config.yaml @@ -39,6 +39,7 @@ cluster: execution_cleanup: true resource_cluster_path: "/vol/data/databases" resource_container_path: "/vol/resources" + tower_secret: "xxx" lifescience_oidc: client_secret: "xxx" client_id: "xxx"