Skip to content
Snippets Groups Projects
Verified Commit 7b5543c7 authored by Daniel Göbel's avatar Daniel Göbel
Browse files

Add endpoint for workflow developer statistics

* Add multiple integration tests

#62
parent 55cdfd79
No related branches found
No related tags found
2 merge requests!69Delete dev branch,!59Resolve "Endpoint for developer statistics"
Pipeline #39316 passed
......@@ -28,8 +28,7 @@ default:
- pip install virtualenv
- virtualenv venv
- source venv/bin/activate
- python -m pip install --upgrade -r requirements.txt
- python -m pip install --upgrade -r requirements-dev.txt
- python -m pip install --upgrade -r requirements.txt -r requirements-dev.txt
stages: # List of stages for jobs, and their order of execution
# - build
......
......@@ -21,7 +21,7 @@ repos:
files: app
args: [--check]
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: 'v0.1.2'
rev: 'v0.1.3'
hooks:
- id: ruff
- repo: https://github.com/PyCQA/isort
......
from datetime import date
from typing import Annotated, Any, Awaitable, Callable, List, Optional, Set
from uuid import UUID
......@@ -12,6 +13,7 @@ from app.crud import CRUDWorkflow, CRUDWorkflowVersion
from app.crud.crud_workflow_mode import CRUDWorkflowMode
from app.git_repository import GitHubRepository, build_repository
from app.schemas.workflow import WorkflowIn, WorkflowOut, WorkflowStatistic, WorkflowUpdate
from app.schemas.workflow_execution import AnonymizedWorkflowExecution
from app.schemas.workflow_version import WorkflowVersion as WorkflowVersionSchema
from app.scm import SCM, Provider
from app.utils.otlp import start_as_current_span_async
......@@ -71,11 +73,11 @@ async def list_workflows(
Workflows in the system
"""
current_span = trace.get_current_span()
if developer_id is not None:
if developer_id is not None: # pragma: no cover
current_span.set_attribute("developer_id", developer_id)
if name_substring is not None: # pragma: no cover
current_span.set_attribute("name_substring", name_substring)
if version_status is not None and len(version_status) > 0:
if version_status is not None and len(version_status) > 0: # pragma: no cover
current_span.set_attribute("version_status", [stat.name for stat in version_status])
rbac_operation = "list"
if developer_id is not None and current_user.uid != developer_id:
......@@ -189,6 +191,66 @@ async def create_workflow(
return WorkflowOut.from_db_workflow(await CRUDWorkflow.get(db, workflow_db.workflow_id))
@router.get("/developer_statistics", status_code=status.HTTP_200_OK, summary="Get anonymized workflow execution")
@start_as_current_span_async("api_workflow_get_developer_statistics", tracer=tracer)
async def get_developer_workflow_statistics(
db: DBSession,
authorization: Authorization,
response: Response,
current_user: CurrentUser,
developer_id: Optional[str] = Query(
None, description="", examples=["28c5353b8bb34984a8bd4169ba94c606"], min_length=3, max_length=64
),
workflow_ids: Optional[List[UUID]] = Query(None, alias="workflow_id"),
start: Optional[date] = Query(None),
end: Optional[date] = Query(None),
) -> List[AnonymizedWorkflowExecution]:
"""
Get the workflow executions with meta information and anonymized user IDs.\n
Permission "workflow:read_statistics" required if the `developer_id` is the same as the uid of the current user,
other "workflow:read_statistics_any".
\f
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
authorization : Callable[[str], Awaitable[Any]]
Async function to ask the auth service for authorization. Dependency Injection.
response : fastapi.Response
Temporary Response object. Dependency Injection.
current_user : clowmdb.models.User
Current user. Dependency Injection.
developer_id : str | None, default None
Filter for workflows developed by a specific user. Query Parameter.
workflow_ids: List[uuid.UUID] | None, default None
Filter by workflow IDs. Query Parameter.
start : datetime.date | None, default None
Filter by executions that started after the specified date. Query Parameter.
end : datetime.date | None, default None
Filter by executions that started before the specified date. Query Parameter.
Returns
-------
statistics : List[app.schema.workflow.AnonymizedWorkflowExecution]
List of raw datapoints for analysis.
"""
span = trace.get_current_span()
if developer_id: # pragma: no cover
span.set_attribute("developer_id", developer_id)
if workflow_ids: # pragma: no cover
span.set_attribute("workflow_ids", [str(wid) for wid in workflow_ids])
if start: # pragma: no cover
span.set_attribute("start_day", start.isoformat())
if end: # pragma: no cover
span.set_attribute("end_day", end.isoformat())
await authorization("read_statistics" if current_user.uid == developer_id else "read_statistics_any")
# Instruct client to cache response for 1 hour
response.headers["Cache-Control"] = "max-age=3600"
return await CRUDWorkflow.developer_statistics(
db, developer_id=developer_id, workflow_ids=workflow_ids, start=start, end=end
)
@router.get("/{wid}", status_code=status.HTTP_200_OK, summary="Get a workflow")
@start_as_current_span_async("api_workflow_get", tracer=tracer)
async def get_workflow(
......@@ -203,7 +265,7 @@ async def get_workflow(
) -> WorkflowOut:
"""
Get a specific workflow.\n
Permission "workflow: read" required.
Permission "workflow:read" required.
\f
Parameters
----------
......@@ -225,7 +287,7 @@ async def get_workflow(
"""
current_span = trace.get_current_span()
current_span.set_attribute("workflow_id", str(workflow.workflow_id))
if version_status is not None and len(version_status) > 0:
if version_status is not None and len(version_status) > 0: # pragma: no cover
current_span.set_attribute("version_status", [stat.name for stat in version_status])
rbac_operation = "read_any" if workflow.developer_id != current_user.uid and version_status is not None else "read"
await authorization(rbac_operation)
......@@ -244,7 +306,8 @@ async def get_workflow_statistics(
workflow: CurrentWorkflow, db: DBSession, authorization: Authorization, response: Response
) -> List[WorkflowStatistic]:
"""
Get the number of started workflow per day.
Get the number of started workflow per day.\n
Permission "workflow:read" required.
\f
Parameters
----------
......@@ -255,16 +318,17 @@ async def get_workflow_statistics(
authorization : Callable[[str], Awaitable[Any]]
Async function to ask the auth service for authorization. Dependency Injection.
response : fastapi.Response
Temporal Response object. Dependency Injection.
Temporary Response object. Dependency Injection.
Returns
-------
statistics : List[app.schema.Workflow.WorkflowStatistic]
statistics : List[app.schema.workflow.WorkflowStatistic]
List of datapoints aggregated by day.
"""
trace.get_current_span().set_attribute("workflow_id", str(workflow.workflow_id))
await authorization("read")
# Instruct client to cache response for 1 hour
response.headers["Cache-Control"] = "max-age=3600"
# Instruct client to cache response for 24 hour
response.headers["Cache-Control"] = "max-age=86400"
return await CRUDWorkflow.statistics(db, workflow.workflow_id)
......
from typing import List, Optional, Union
from datetime import date, datetime
from hashlib import sha256
from os import urandom
from typing import Dict, List, Optional, Union
from uuid import UUID
from clowmdb.models import Workflow, WorkflowExecution, WorkflowVersion
......@@ -10,6 +13,7 @@ from sqlalchemy.orm import joinedload
from app.crud.crud_workflow_mode import CRUDWorkflowMode
from app.crud.crud_workflow_version import CRUDWorkflowVersion
from app.schemas.workflow import WorkflowIn, WorkflowStatistic
from app.schemas.workflow_execution import AnonymizedWorkflowExecution
tracer = trace.get_tracer_provider().get_tracer(__name__)
......@@ -101,6 +105,90 @@ class CRUDWorkflow:
await db.execute(stmt)
await db.commit()
@staticmethod
async def developer_statistics(
db: AsyncSession,
developer_id: Optional[str] = None,
workflow_ids: Optional[List[UUID]] = None,
start: Optional[date] = None,
end: Optional[date] = None,
) -> List[AnonymizedWorkflowExecution]:
"""
Get all workflow executions for a set of workflows with anonymized user ID.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
developer_id : str | None, default None
Filter workflow by developer ID.
workflow_ids : uuid.UUID | None, default None
Filter workflows by ID.
start : datetime.date | None, default None
Filter workflow execution that started after this date.
end : datetime.date | None, default None
Filter workflow execution that started before this date
Returns
-------
stats : List[app.schemas.Workflow.AnonymizedWorkflowExecution]
List of datapoints
"""
with tracer.start_as_current_span("db_get_workflow_developer_statistics") as span:
stmt = (
select(
cast(func.FROM_UNIXTIME(WorkflowExecution.start_time), Date).label("started_at"),
WorkflowExecution._execution_id,
WorkflowExecution.user_id,
WorkflowExecution._workflow_mode_id,
WorkflowVersion.git_commit_hash,
Workflow._workflow_id,
Workflow.developer_id,
)
.select_from(WorkflowExecution)
.join(WorkflowVersion)
.join(Workflow)
.order_by("started_at")
)
if developer_id:
span.set_attribute("developer_id", developer_id)
stmt = stmt.where(Workflow.developer_id == developer_id)
if workflow_ids:
span.set_attribute("workflow_ids", [str(wid) for wid in workflow_ids])
stmt = stmt.where(*[Workflow._workflow_id == wid.bytes for wid in workflow_ids])
if start:
span.set_attribute("start_date", start.isoformat())
timestamp = round(datetime(year=start.year, month=start.month, day=start.day).timestamp())
stmt = stmt.where(WorkflowExecution.start_time > timestamp)
if end:
span.set_attribute("end_date", end.isoformat())
timestamp = round(datetime(year=end.year, month=end.month, day=end.day).timestamp())
stmt = stmt.where(WorkflowExecution.start_time < timestamp)
user_hashes: Dict[str, str] = {}
def hash_user_id(uid: str) -> str:
if uid not in user_hashes.keys():
hash_obj = sha256(usedforsecurity=True)
hash_obj.update(bytes.fromhex(uid if len(uid) % 2 == 0 else uid + "0"))
hash_obj.update(urandom(32))
user_hashes[uid] = hash_obj.hexdigest()
return user_hashes[uid]
span.set_attribute("sql_query", str(stmt))
rows = await db.execute(stmt)
return [
AnonymizedWorkflowExecution(
workflow_execution_id=row._execution_id,
pseudo_uid=hash_user_id(row.user_id),
workflow_mode_id=row._workflow_mode_id,
started_at=row.started_at,
workflow_id=row._workflow_id,
developer_id=row.developer_id,
git_commit_hash=row.git_commit_hash,
)
for row in rows
]
@staticmethod
async def statistics(db: AsyncSession, workflow_id: Union[bytes, UUID]) -> List[WorkflowStatistic]:
"""
......@@ -115,7 +203,7 @@ class CRUDWorkflow:
Returns
-------
stat : List[app.schemas.Workflow.WorkflowStatistic]
stats : List[app.schemas.Workflow.WorkflowStatistic]
List of datapoints
"""
with tracer.start_as_current_span("db_get_workflow_statistics") as span:
......
......@@ -64,10 +64,10 @@ class WorkflowIn(_BaseWorkflow):
class WorkflowOut(_BaseWorkflow):
workflow_id: UUID = Field(..., description="Id of the workflow", examples=["0cc78936-381b-4bdd-999d-736c40591078"])
workflow_id: UUID = Field(..., description="ID of the workflow")
versions: List[WorkflowVersion] = Field(..., description="Versions of the workflow")
developer_id: str = Field(
..., description="Id of developer of the workflow", examples=["28c5353b8bb34984a8bd4169ba94c606"]
..., description="ID of developer of the workflow", examples=["28c5353b8bb34984a8bd4169ba94c606"]
)
private: bool = Field(default=False, description="Flag if the workflow is hosted in a private git repository")
......
from datetime import datetime
from datetime import date, datetime
from typing import Any, Dict, Optional
from uuid import UUID
......@@ -38,9 +38,7 @@ class WorkflowExecutionIn(_BaseWorkflowExecution):
class WorkflowExecutionOut(_BaseWorkflowExecution):
execution_id: UUID = Field(
..., description="ID of the workflow execution", examples=["0cc78936-381b-4bdd-999d-736c40591078"]
)
execution_id: UUID = Field(..., description="ID of the workflow execution")
user_id: str = Field(
..., description="UID of user who started the workflow", examples=["28c5353b8bb34984a8bd4169ba94c606"]
)
......@@ -62,9 +60,7 @@ class WorkflowExecutionOut(_BaseWorkflowExecution):
workflow_version_id: Optional[str] = Field( # type: ignore[assignment]
None, description="Workflow version git commit hash", examples=["ba8bcd9294c2c96aedefa1763a84a18077c50c0f"]
)
workflow_id: Optional[UUID] = Field(
None, description="Id of the workflow", examples=["0cc78936-381b-4bdd-999d-736c40591078"]
)
workflow_id: Optional[UUID] = Field(None, description="Id of the workflow")
@staticmethod
def from_db_model(
......@@ -118,3 +114,23 @@ class DevWorkflowExecutionIn(BaseModel):
@field_serializer("repository_url")
def serialize_dt(self, url: AnyHttpUrl, _info: FieldSerializationInfo) -> str:
return str(url)
class AnonymizedWorkflowExecution(BaseModel):
workflow_execution_id: UUID = Field(..., description="ID of the workflow execution")
pseudo_uid: str = Field(
...,
description="Anonymized user ID of the user who ran the workflow execution",
examples=["7ed4249857b656e96f456449796e461e6001d3fb2481a44701f70ca437bd53a2"],
)
workflow_mode_id: Optional[UUID] = Field(None, description="ID of the workflow mode this workflow execution ran in")
git_commit_hash: str = Field(
..., description="Hash of the git commit", examples=["ba8bcd9294c2c96aedefa1763a84a18077c50c0f"]
)
started_at: date = Field(
..., description="Day of the workflow execution", examples=[date(day=1, month=1, year=2023)]
)
workflow_id: UUID = Field(..., description="ID of the workflow")
developer_id: str = Field(
..., description="ID of developer of the workflow", examples=["28c5353b8bb34984a8bd4169ba94c606"]
)
......@@ -507,6 +507,40 @@ class TestWorkflowRoutesList(_TestWorkflowRoutes):
assert len(workflows) == 1
assert workflows[0]["workflow_id"] == str(random_workflow.workflow_id)
@pytest.mark.asyncio
async def test_list_workflow_statistics(
self,
client: AsyncClient,
random_user: UserWithAuthHeader,
random_workflow: WorkflowOut,
random_workflow_execution: WorkflowExecution,
) -> None:
"""
Test for getting all workflow executions as workflow statistics.
Parameters
----------
client : httpx.AsyncClient
HTTP Client to perform the request on.
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing.
random_workflow : app.schemas.workflow.WorkflowOut
Random workflow for testing.
random_workflow_execution : clowmdb.models.WorkflowExecution
Random workflow execution for testing
"""
response = await client.get(
f"{self.base_path}/developer_statistics",
headers=random_user.auth_headers,
)
assert response.status_code == status.HTTP_200_OK
executions = response.json()
assert len(executions) == 1
execution = executions[0]
assert execution["workflow_id"] == str(random_workflow.workflow_id)
assert execution["workflow_execution_id"] == str(random_workflow_execution.execution_id)
assert execution["git_commit_hash"] == random_workflow_execution.workflow_version_id
class TestWorkflowRoutesGet(_TestWorkflowRoutes):
@pytest.mark.asyncio
......@@ -571,6 +605,10 @@ class TestWorkflowRoutesGet(_TestWorkflowRoutes):
HTTP Client to perform the request on.
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing.
random_workflow : app.schemas.workflow.WorkflowOut
Random workflow for testing.
random_workflow_execution : clowmdb.models.WorkflowExecution
Random workflow execution for testing
"""
response = await client.get(
"/".join([self.base_path, str(random_workflow.workflow_id), "statistics"]),
......
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.crud import CRUDUser
from app.tests.utils.user import UserWithAuthHeader
from app.tests.utils.utils import random_hex_string
class TestUserCRUD:
@pytest.mark.asyncio
async def test_get_user(self, db: AsyncSession, random_user: UserWithAuthHeader) -> None:
"""
Test for getting a user from the database
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing.
"""
user = await CRUDUser.get(db, random_user.user.uid)
assert user is not None
assert user == random_user.user
@pytest.mark.asyncio
async def test_get_non_existing_user(self, db: AsyncSession) -> None:
"""
Test for getting a non-existing user from the database
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
"""
user = await CRUDUser.get(db, random_hex_string())
assert user is None
import random
from datetime import date
from datetime import date, timedelta
from uuid import uuid4
import pytest
from clowmdb.models import Workflow, WorkflowExecution, WorkflowVersion
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from app.crud import CRUDWorkflow
from app.schemas.workflow import WorkflowIn, WorkflowOut
from app.schemas.workflow_mode import WorkflowModeIn
from app.tests.utils.user import UserWithAuthHeader
from app.tests.utils.utils import random_hex_string, random_lower_string
......@@ -148,12 +151,215 @@ class TestWorkflowCRUDGet:
Async database session to perform query on.
random_workflow : app.schemas.workflow.WorkflowOut
Random bucket for testing.
random_workflow_execution : clowmdb.models.WorkflowExecution
Random workflow execution for testing.
"""
statistics = await CRUDWorkflow.statistics(db, random_workflow.workflow_id)
assert len(statistics) == 1
assert statistics[0].day == date.today()
assert statistics[0].count == 1
@pytest.mark.asyncio
async def test_get_workflow_developer_statistics(
self, db: AsyncSession, random_workflow: WorkflowOut, random_workflow_execution: WorkflowExecution
) -> None:
"""
Test for getting workflow statistics for developer.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_workflow : app.schemas.workflow.WorkflowOut
Random bucket for testing.
random_workflow_execution : clowmdb.models.WorkflowExecution
Random workflow execution for testing.
"""
statistics = await CRUDWorkflow.developer_statistics(db)
assert len(statistics) == 1
assert statistics[0].started_at == date.today()
assert statistics[0].workflow_id == random_workflow.workflow_id
assert statistics[0].workflow_execution_id == random_workflow_execution.execution_id
assert statistics[0].git_commit_hash == random_workflow_execution.workflow_version_id
assert statistics[0].pseudo_uid != random_workflow_execution.user_id
assert statistics[0].developer_id == random_workflow.developer_id
@pytest.mark.asyncio
async def test_get_workflow_developer_statistics_with_developer_id(
self, db: AsyncSession, random_workflow: WorkflowOut, random_workflow_execution: WorkflowExecution
) -> None:
"""
Test for getting workflow statistics for developer with a developer ID.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_workflow : app.schemas.workflow.WorkflowOut
Random bucket for testing.
random_workflow_execution : clowmdb.models.WorkflowExecution
Random workflow execution for testing.
"""
statistics = await CRUDWorkflow.developer_statistics(db, developer_id=random_workflow.developer_id)
assert len(statistics) == 1
assert statistics[0].started_at == date.today()
assert statistics[0].workflow_id == random_workflow.workflow_id
assert statistics[0].workflow_execution_id == random_workflow_execution.execution_id
assert statistics[0].git_commit_hash == random_workflow_execution.workflow_version_id
assert statistics[0].pseudo_uid != random_workflow_execution.user_id
assert statistics[0].developer_id == random_workflow.developer_id
@pytest.mark.asyncio
async def test_get_workflow_developer_statistics_with_non_existent_developer_id(
self, db: AsyncSession, random_workflow: WorkflowOut, random_workflow_execution: WorkflowExecution
) -> None:
"""
Test for getting workflow statistics for developer with a non-existing developer ID.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_workflow : app.schemas.workflow.WorkflowOut
Random bucket for testing.
random_workflow_execution : clowmdb.models.WorkflowExecution
Random workflow execution for testing.
"""
statistics = await CRUDWorkflow.developer_statistics(db, developer_id=random_lower_string(40))
assert len(statistics) == 0
@pytest.mark.asyncio
async def test_get_workflow_developer_statistics_with_workflow_id(
self, db: AsyncSession, random_workflow: WorkflowOut, random_workflow_execution: WorkflowExecution
) -> None:
"""
Test for getting workflow statistics for developer with a workflow ID.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_workflow : app.schemas.workflow.WorkflowOut
Random bucket for testing.
random_workflow_execution : clowmdb.models.WorkflowExecution
Random workflow execution for testing.
"""
statistics = await CRUDWorkflow.developer_statistics(db, workflow_ids=[random_workflow.workflow_id])
assert len(statistics) == 1
assert statistics[0].started_at == date.today()
assert statistics[0].workflow_id == random_workflow.workflow_id
assert statistics[0].workflow_execution_id == random_workflow_execution.execution_id
assert statistics[0].git_commit_hash == random_workflow_execution.workflow_version_id
assert statistics[0].pseudo_uid != random_workflow_execution.user_id
assert statistics[0].developer_id == random_workflow.developer_id
@pytest.mark.asyncio
async def test_get_workflow_developer_statistics_with_non_existent_workflow_id(
self, db: AsyncSession, random_workflow: WorkflowOut, random_workflow_execution: WorkflowExecution
) -> None:
"""
Test for getting workflow statistics for developer with a non-existing workflow ID.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_workflow : app.schemas.workflow.WorkflowOut
Random bucket for testing.
random_workflow_execution : clowmdb.models.WorkflowExecution
Random workflow execution for testing.
"""
statistics = await CRUDWorkflow.developer_statistics(db, workflow_ids=[uuid4()])
assert len(statistics) == 0
@pytest.mark.asyncio
async def test_get_workflow_developer_statistics_with_start_date(
self, db: AsyncSession, random_workflow: WorkflowOut, random_workflow_execution: WorkflowExecution
) -> None:
"""
Test for getting workflow statistics for developer with a start date in the past.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_workflow : app.schemas.workflow.WorkflowOut
Random bucket for testing.
random_workflow_execution : clowmdb.models.WorkflowExecution
Random workflow execution for testing.
"""
statistics = await CRUDWorkflow.developer_statistics(db, start=date.today() - timedelta(days=7))
assert len(statistics) == 1
assert statistics[0].started_at == date.today()
assert statistics[0].workflow_id == random_workflow.workflow_id
assert statistics[0].workflow_execution_id == random_workflow_execution.execution_id
assert statistics[0].git_commit_hash == random_workflow_execution.workflow_version_id
assert statistics[0].pseudo_uid != random_workflow_execution.user_id
assert statistics[0].developer_id == random_workflow.developer_id
@pytest.mark.asyncio
async def test_get_workflow_developer_statistics_with_bad_start_day(
self, db: AsyncSession, random_workflow: WorkflowOut, random_workflow_execution: WorkflowExecution
) -> None:
"""
Test for getting workflow statistics for developer with a start date in the future.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_workflow : app.schemas.workflow.WorkflowOut
Random bucket for testing.
random_workflow_execution : clowmdb.models.WorkflowExecution
Random workflow execution for testing.
"""
statistics = await CRUDWorkflow.developer_statistics(db, start=date.today() + timedelta(days=7))
assert len(statistics) == 0
@pytest.mark.asyncio
async def test_get_workflow_developer_statistics_with_end_date(
self, db: AsyncSession, random_workflow: WorkflowOut, random_workflow_execution: WorkflowExecution
) -> None:
"""
Test for getting workflow statistics for developer with a end date in the future.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_workflow : app.schemas.workflow.WorkflowOut
Random bucket for testing.
random_workflow_execution : clowmdb.models.WorkflowExecution
Random workflow execution for testing.
"""
statistics = await CRUDWorkflow.developer_statistics(db, end=date.today() + timedelta(days=7))
assert len(statistics) == 1
assert statistics[0].started_at == date.today()
assert statistics[0].workflow_id == random_workflow.workflow_id
assert statistics[0].workflow_execution_id == random_workflow_execution.execution_id
assert statistics[0].git_commit_hash == random_workflow_execution.workflow_version_id
assert statistics[0].pseudo_uid != random_workflow_execution.user_id
assert statistics[0].developer_id == random_workflow.developer_id
@pytest.mark.asyncio
async def test_get_workflow_developer_statistics_with_bad_end_day(
self, db: AsyncSession, random_workflow: WorkflowOut, random_workflow_execution: WorkflowExecution
) -> None:
"""
Test for getting workflow statistics for developer with a end date in the past.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_workflow : app.schemas.workflow.WorkflowOut
Random bucket for testing.
random_workflow_execution : clowmdb.models.WorkflowExecution
Random workflow execution for testing.
"""
statistics = await CRUDWorkflow.developer_statistics(db, end=date.today() - timedelta(days=7))
assert len(statistics) == 0
class TestWorkflowCRUDCreate:
@pytest.mark.asyncio
......@@ -185,6 +391,47 @@ class TestWorkflowCRUDCreate:
await db.execute(delete(Workflow).where(Workflow._workflow_id == workflow.workflow_id.bytes))
await db.commit()
@pytest.mark.asyncio
async def test_create_workflow_with_mode(self, db: AsyncSession, random_user: UserWithAuthHeader) -> None:
"""
Test for creating a workflow with a mode in CRUD Repository.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing.
"""
workflow_in = WorkflowIn(
git_commit_hash=random_hex_string(),
name=random_lower_string(10),
short_description=random_lower_string(65),
repository_url="https://github.com/example/example",
modes=[
WorkflowModeIn(
schema_path="example/schema.json", name=random_lower_string(), entrypoint=random_lower_string()
)
],
)
workflow = await CRUDWorkflow.create(db, workflow=workflow_in, developer=random_user.user.uid)
assert workflow is not None
stmt = (
select(Workflow)
.where(Workflow._workflow_id == workflow.workflow_id.bytes)
.options(joinedload(Workflow.versions).selectinload(WorkflowVersion.workflow_modes))
)
created_workflow = await db.scalar(stmt)
assert created_workflow is not None
assert created_workflow == workflow
assert len(created_workflow.versions) == 1
assert len(created_workflow.versions[0].workflow_modes) == 1
await db.execute(delete(Workflow).where(Workflow._workflow_id == workflow.workflow_id.bytes))
await db.commit()
@pytest.mark.asyncio
async def test_create_workflow_credentials(self, db: AsyncSession, random_workflow: WorkflowOut) -> None:
"""
......
......@@ -254,6 +254,29 @@ class TestWorkflowExecutionCRUDLUpdate:
assert execution is not None
assert execution.status == WorkflowExecution.WorkflowExecutionStatus.CANCELED
@pytest.mark.asyncio
async def test_update_workflow_execution_slurm_job(
self, db: AsyncSession, random_workflow_execution: WorkflowExecution
) -> None:
"""
Test for updating the slurm job id of a workflow execution.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_workflow_execution : clowmdb.models.WorkflowExecution
Random workflow execution for testing.
"""
await CRUDWorkflowExecution.update_slurm_job_id(db, random_workflow_execution.execution_id, 250)
stmt = select(WorkflowExecution).where(
WorkflowExecution._execution_id == random_workflow_execution.execution_id.bytes
)
execution = await db.scalar(stmt)
assert execution is not None
assert execution.slurm_job_id == 250
class TestWorkflowExecutionCRUDDelete:
@pytest.mark.asyncio
......
from uuid import uuid4
import pytest
from clowmdb.models import WorkflowVersion
from clowmdb.models import WorkflowMode, WorkflowVersion
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.crud import CRUDWorkflowVersion
from app.schemas.workflow import WorkflowOut
......@@ -28,6 +31,44 @@ class TestWorkflowVersionCRUDGet:
assert version.workflow_id == random_workflow_version.workflow_id
assert version.git_commit_hash == random_workflow_version.git_commit_hash
@pytest.mark.asyncio
async def test_get_specific_workflow_version_with_workflow_id(
self, db: AsyncSession, random_workflow_version: WorkflowVersion
) -> None:
"""
Test for getting a workflow version and constraint it by a workflow ID from CRUD Repository.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_workflow_version : clowmdb.model.WorkflowVersion
Random workflow version for testing.
"""
version = await CRUDWorkflowVersion.get(
db, random_workflow_version.git_commit_hash, workflow_id=random_workflow_version.workflow_id
)
assert version is not None
assert version.workflow_id == random_workflow_version.workflow_id
assert version.git_commit_hash == random_workflow_version.git_commit_hash
@pytest.mark.asyncio
async def test_get_specific_workflow_version_with_false_workflow_id(
self, db: AsyncSession, random_workflow_version: WorkflowVersion
) -> None:
"""
Test for getting a workflow version and constraint it with a non-existing workflow ID CRUD Repository.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_workflow_version : clowmdb.model.WorkflowVersion
Random workflow version for testing.
"""
version = await CRUDWorkflowVersion.get(db, random_workflow_version.git_commit_hash, workflow_id=uuid4())
assert version is None
@pytest.mark.asyncio
async def test_get_specific_workflow_version_with_populated_workflow(
self, db: AsyncSession, random_workflow_version: WorkflowVersion
......@@ -145,10 +186,52 @@ class TestWorkflowVersionCRUDCreate:
)
assert workflow_version is not None
stmt = select(WorkflowVersion).where(WorkflowVersion.git_commit_hash == workflow_version.git_commit_hash)
stmt = (
select(WorkflowVersion)
.where(WorkflowVersion.git_commit_hash == workflow_version.git_commit_hash)
.options(selectinload(WorkflowVersion.workflow_modes))
)
created_workflow_version = await db.scalar(stmt)
assert created_workflow_version is not None
assert created_workflow_version == workflow_version
assert len(created_workflow_version.workflow_modes) == 0
@pytest.mark.asyncio
async def test_create_workflow_version_with_mode(
self, db: AsyncSession, random_workflow: WorkflowOut, random_workflow_mode: WorkflowMode
) -> None:
"""
Test for creating a workflow version in CRUD Repository.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_workflow : app.schemas.workflow.WorkflowOut
Random bucket for testing.
random_workflow_mode : clowmdb.models.WorkflowMode
Random workflow mode for testing.
"""
workflow_version = await CRUDWorkflowVersion.create(
db,
git_commit_hash=random_hex_string(),
version="v2.0.0",
wid=random_workflow.workflow_id,
previous_version=random_workflow.versions[-1].git_commit_hash,
modes=[random_workflow_mode.mode_id],
)
assert workflow_version is not None
stmt = (
select(WorkflowVersion)
.where(WorkflowVersion.git_commit_hash == workflow_version.git_commit_hash)
.options(selectinload(WorkflowVersion.workflow_modes))
)
created_workflow_version = await db.scalar(stmt)
assert created_workflow_version is not None
assert created_workflow_version == workflow_version
assert len(created_workflow_version.workflow_modes) == 1
assert created_workflow_version.workflow_modes[0].mode_id == random_workflow_mode.mode_id
class TestWorkflowVersionCRUDUpdate:
......
......@@ -7,7 +7,7 @@ from authlib.jose import JsonWebToken
from clowmdb.models import User
from sqlalchemy.ext.asyncio import AsyncSession
from .utils import random_lower_string
from .utils import random_hex_string, random_lower_string
_jwt = JsonWebToken(["HS256"])
......@@ -84,7 +84,7 @@ async def create_random_user(db: AsyncSession) -> User:
Newly created user.
"""
user = User(
uid=random_lower_string(),
uid=random_hex_string(),
display_name=random_lower_string(),
)
db.add(user)
......
......@@ -42,6 +42,11 @@ class BackoffStrategy(ABC, Generator):
"""
Compute the next value in the infinite series for this backoff strategy.
Parameters
----------
iteration : int
The number of the current interation
Returns
-------
val : int
......
......@@ -4,7 +4,7 @@ pytest-asyncio>=0.21.0,<0.22.0
pytest-cov>=4.1.0,<4.2.0
coverage[toml]>=7.3.0,<7.4.0
# Linters
ruff>=0.1.2,<0.1.3
ruff>=0.1.3,<0.1.4
black>=23.10.0,<23.11.0
isort>=5.12.0,<5.13.0
mypy>=1.6.0,<1.7.0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment