diff --git a/Dockerfile-Gunicorn b/Dockerfile-Gunicorn index 9c460ba123efe26687258bc06f8f76c5e36fa43d..79f82be9b4fa45d10d66ed9256a5d9f97337a34a 100644 --- a/Dockerfile-Gunicorn +++ b/Dockerfile-Gunicorn @@ -11,5 +11,4 @@ COPY ./requirements.txt /app/requirements.txt RUN pip install --no-cache-dir --upgrade -r requirements.txt -COPY ./mako_templates /app/mako_templates COPY ./app /app/app diff --git a/app/api/api.py b/app/api/api.py index 4528c52015b2b3affe0c8f59da9ce3148516041f..2208982220e4eca6c221d35f6cf1a5b23be4c3ba 100644 --- a/app/api/api.py +++ b/app/api/api.py @@ -3,7 +3,7 @@ from typing import Any, Dict, Union from fastapi import APIRouter, Depends, status from app.api.dependencies import decode_bearer_token -from app.api.endpoints import resources +from app.api.endpoints import resource_version, resources from app.schemas.security import ErrorDetail alternative_responses: Dict[Union[int, str], Dict[str, Any]] = { @@ -30,3 +30,6 @@ api_router.include_router( dependencies=[Depends(decode_bearer_token)], responses=alternative_responses, ) +api_router.include_router( + resource_version.router, dependencies=[Depends(decode_bearer_token)], responses=alternative_responses +) diff --git a/app/api/dependencies.py b/app/api/dependencies.py index 97a44fe05f9f2e048c68092a09c987b93b60b7e6..4bf42b800da560e6fb3219c2f3eae1e749f106f3 100644 --- a/app/api/dependencies.py +++ b/app/api/dependencies.py @@ -1,9 +1,10 @@ from typing import TYPE_CHECKING, Annotated, AsyncIterator, Awaitable, Callable, Dict +from uuid import UUID from authlib.jose.errors import BadSignatureError, DecodeError, ExpiredTokenError from clowmdb.db.session import get_async_session -from clowmdb.models import User -from fastapi import Depends, HTTPException, Request, status +from clowmdb.models import Resource, ResourceVersion, User +from fastapi import Depends, HTTPException, Path, Request, status from fastapi.security import HTTPBearer from fastapi.security.http import HTTPAuthorizationCredentials from httpx import AsyncClient @@ -62,7 +63,7 @@ async def get_httpx_client(request: Request) -> AsyncClient: # pragma: no cover HTTPClient = Annotated[AsyncClient, Depends(get_httpx_client)] -def get_slurm_client(client: AsyncClient = Depends(get_httpx_client)) -> SlurmClient: +def get_slurm_client(client: HTTPClient) -> SlurmClient: return SlurmClient(client=client) @@ -83,9 +84,9 @@ def get_decode_jwt_function() -> Callable[[str], Dict[str, str]]: # pragma: no @start_as_current_span_async("decode_jwt", tracer=tracer) async def decode_bearer_token( - token: HTTPAuthorizationCredentials = Depends(bearer_token), - decode: Callable[[str], Dict[str, str]] = Depends(get_decode_jwt_function), - db: AsyncSession = Depends(get_db), + token: Annotated[HTTPAuthorizationCredentials, Depends(bearer_token)], + decode: Annotated[Callable[[str], Dict[str, str]], Depends(get_decode_jwt_function)], + db: DBSession, ) -> JWT: """ Get the decoded JWT or reject request if it is not valid or the user doesn't exist. @@ -133,8 +134,8 @@ class AuthorizationDependency: def __call__( self, - token: JWT = Depends(decode_bearer_token), - client: AsyncClient = Depends(get_httpx_client), + token: Annotated[JWT, Depends(decode_bearer_token)], + client: HTTPClient, ) -> Callable[[str], Awaitable[AuthzResponse]]: """ Get the function to request the authorization service with the resource, JWT and HTTP Client already injected. @@ -159,7 +160,7 @@ class AuthorizationDependency: return authorization_wrapper -async def get_current_user(token: JWT = Depends(decode_bearer_token), db: AsyncSession = Depends(get_db)) -> User: +async def get_current_user(token: Annotated[JWT, Depends(decode_bearer_token)], db: DBSession) -> User: """ Get the current user from the database based on the JWT. @@ -184,3 +185,62 @@ async def get_current_user(token: JWT = Depends(decode_bearer_token), db: AsyncS CurrentUser = Annotated[User, Depends(get_current_user)] + + +async def get_current_resource(rid: Annotated[UUID, Path()], db: DBSession) -> Resource: + """ + Get the current resource from the database based on the ID in the path. + + FastAPI Dependency Injection Function. + + Parameters + ---------- + rid : uuid. UUID + ID of a resource. Path parameter. + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. Dependency Injection. + + Returns + ------- + resource : clowmdb.models.Resource + Resource associated with the ID in the path. + """ + return Resource( + name="name", _resource_id=rid.bytes, short_description="a" * 32, source="a" * 8, maintainer_id="abc" + ) + + +CurrentResource = Annotated[Resource, Depends(get_current_resource)] + + +async def get_current_resource_version( + rvid: Annotated[UUID, Path()], resource: CurrentResource, db: DBSession +) -> ResourceVersion: + """ + Get the current resource version from the database based on the ID in the path. + + FastAPI Dependency Injection Function. + + Parameters + ---------- + rvid : uuid. UUID + ID of a resource version. Path parameter. + resource : clowmdb.models.Resource + Resource associated with the ID in the path. + db : sqlalchemy.ext.asyncio.AsyncSession. + Async database session to perform query on. Dependency Injection. + + Returns + ------- + resource_version : clowmdb.models.ResourceVersion + Resource Version associated with the ID in the path. + """ + return ResourceVersion( + _resource_version_id=rvid.bytes, + _resource_id=resource.resource_id.bytes, + release="1.0.0", + status=ResourceVersion.Status.RESOURCE_REQUESTED, + ) + + +CurrentResourceVersion = Annotated[ResourceVersion, Depends(get_current_resource_version)] diff --git a/app/api/endpoints/resource_version.py b/app/api/endpoints/resource_version.py new file mode 100644 index 0000000000000000000000000000000000000000..8adc44253b034bbe593dfad2fd8b90a21e2d58f0 --- /dev/null +++ b/app/api/endpoints/resource_version.py @@ -0,0 +1,311 @@ +from typing import Annotated, Any, Awaitable, Callable, List, Optional +from uuid import uuid4 + +from clowmdb.models import ResourceVersion +from fastapi import APIRouter, Depends, Query, status +from opentelemetry import trace + +from app.api.dependencies import AuthorizationDependency, CurrentResource, CurrentResourceVersion, CurrentUser +from app.schemas.resource_version import ResourceVersionIn, ResourceVersionOut +from app.utils.otlp import start_as_current_span_async + +router = APIRouter(prefix="/resources/{rid}/versions", tags=["ResourceVersion"]) +resource_authorization = AuthorizationDependency(resource="resource") +Authorization = Annotated[Callable[[str], Awaitable[Any]], Depends(resource_authorization)] +tracer = trace.get_tracer_provider().get_tracer(__name__) + + +@router.get("", summary="List versions of a resource") +@start_as_current_span_async("api_list_resource_versions", tracer=tracer) +async def list_resource_versions( + authorization: Authorization, + resource: CurrentResource, + current_user: CurrentUser, + version_status: Annotated[ + Optional[List[ResourceVersion.Status]], + Query( + description=f"Which versions to include in the response. Permission `resource:read_any` required, current user is the maintainer, then only permission `resource:read` required. Default `{ResourceVersion.Status.LATEST.name}` and `{ResourceVersion.Status.SYNCHRONIZED.name}`.", + # noqa: E501 + ), + ] = None, +) -> List[ResourceVersionOut]: + """ + List all the resource version for a specific resource. + + Permission 'resource:read' required. + \f + Parameters + ---------- + authorization : Callable[[str], Awaitable[Any]] + Async function to ask the auth service for authorization. Dependency Injection. + resource : clowmdb.models.Resource + Resource associated with the ID in the path. Dependency Injection. + current_user : clowmdb.models.User + Current user. Dependency injection. + version_status : List[clowmdb.models.ResourceVersion.Status] | None, default None + Filter resource version by their status. Query Parameter. + """ + current_span = trace.get_current_span() + if version_status: # pragma: no cover + current_span.set_attribute("version_status", [state.name for state in version_status]) + current_span.set_attribute("resource_id", str(resource.resource_id)) + rbac_operation = ( + "list_filter" if resource.maintainer_id != current_user.uid and version_status is not None else "list" + ) + await authorization(rbac_operation) + + return [] + + +@router.post("", summary="Request new version of a resource", status_code=status.HTTP_201_CREATED) +@start_as_current_span_async("api_request_resource_version", tracer=tracer) +async def request_resource_version( + authorization: Authorization, + resource: CurrentResource, + resource_version: ResourceVersionIn, + current_user: CurrentUser, +) -> ResourceVersionOut: + """ + Request a new resource version. + + Permission `resource:update` required if the current user is the maintainer, `resource:update_any` otherwise. + \f + Parameters + ---------- + authorization : Callable[[str], Awaitable[Any]] + Async function to ask the auth service for authorization. Dependency Injection. + resource : clowmdb.models.Resource + Resource associated with the ID in the path. Dependency Injection. + resource_version : app.schemas.resource_version.ResourceVersionIn + Data about the new resource version. HTTP Body. + current_user : clowmdb.models.User + Current user. Dependency injection. + """ + current_span = trace.get_current_span() + current_span.set_attributes( + {"resource_id": str(resource.resource_id), "resource_version_in": resource_version.model_dump_json(indent=2)} + ) + rbac_operation = "update" + if current_user.uid != resource.maintainer_id: + rbac_operation = "update_any" + await authorization(rbac_operation) + return ResourceVersionOut( + release="1.0.0", + status=ResourceVersion.Status.RESOURCE_REQUESTED, + resource_version_id=uuid4(), + resource_id=resource.resource_id, + created_at=0, + ) + + +@router.get("/{rvid}", summary="Get version of a resource") +@start_as_current_span_async("api_get_resource_version", tracer=tracer) +async def get_resource_version( + authorization: Authorization, + resource: CurrentResource, + current_user: CurrentUser, + resource_version: CurrentResourceVersion, +) -> ResourceVersionOut: + """ + Get a specific resource version for a specific resource. + + Permission `resource:read` required. If the status of the resource version is not `LATEST` or `SYNCHRONIZED` and + the current user is not the maintainer, then the permission `resource:read_any` is required. + \f + Parameters + ---------- + authorization : Callable[[str], Awaitable[Any]] + Async function to ask the auth service for authorization. Dependency Injection. + resource : clowmdb.models.Resource + Resource associated with the ID in the path. Dependency Injection. + current_user : clowmdb.models.User + Current user. Dependency injection. + resource_version : clowmdb.models.ResourceVersion + Resource Version associated with the ID in the path. Dependency Injection. + """ + trace.get_current_span().set_attributes( + {"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)} + ) + rbac_operation = "read" + # Maintainer can read any version of his workflow, others only synchronized and latest ones + if current_user.uid != resource.maintainer_id and resource_version.status not in [ + ResourceVersion.Status.SYNCHRONIZED, + ResourceVersion.Status.LATEST, + ]: + rbac_operation = "read_any" + await authorization(rbac_operation) + return ResourceVersionOut( + release="1.0.0", + status=ResourceVersion.Status.RESOURCE_REQUESTED, + resource_version_id=resource_version.resource_version_id, + resource_id=resource.resource_id, + created_at=0, + ) + + +@router.put("/{rvid}/request_sync", summary="Request resource version synchronization") +@start_as_current_span_async("api_request_resource_version_sync", tracer=tracer) +async def request_resource_version_sync( + authorization: Authorization, + resource: CurrentResource, + resource_version: CurrentResourceVersion, + current_user: CurrentUser, +) -> ResourceVersionOut: + """ + Request the synchronization of a resource version to the cluster. + + Permission `resource:request_sync` required if current user is the maintainer, `resource:request_sync_any` otherwise. + \f + Parameters + ---------- + authorization : Callable[[str], Awaitable[Any]] + Async function to ask the auth service for authorization. Dependency Injection. + resource : clowmdb.models.Resource + Resource associated with the ID in the path. Dependency Injection. + current_user : clowmdb.models.User + Current user. Dependency injection. + resource_version : clowmdb.models.ResourceVersion + Resource Version associated with the ID in the path. Dependency Injection. + """ + trace.get_current_span().set_attributes( + {"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)} + ) + rbac_operation = "request_sync" + if current_user.uid != resource.maintainer_id: + rbac_operation = "request_sync_any" + await authorization(rbac_operation) + return ResourceVersionOut( + release="1.0.0", + status=ResourceVersion.Status.SYNC_REQUESTED, + resource_version_id=resource_version.resource_version_id, + resource_id=resource.resource_id, + created_at=0, + ) + + +@router.put("/{rvid}/sync", summary="Synchronize resource version with cluster") +@start_as_current_span_async("api_resource_version_sync", tracer=tracer) +async def resource_version_sync( + authorization: Authorization, resource: CurrentResource, resource_version: CurrentResourceVersion +) -> ResourceVersionOut: + """ + Synchronize the resource version to the cluster. + + Permission `resource:sync` required. + \f + Parameters + ---------- + authorization : Callable[[str], Awaitable[Any]] + Async function to ask the auth service for authorization. Dependency Injection. + resource : clowmdb.models.Resource + Resource associated with the ID in the path. Dependency Injection. + resource_version : clowmdb.models.ResourceVersion + Resource Version associated with the ID in the path. Dependency Injection. + """ + trace.get_current_span().set_attributes( + {"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)} + ) + await authorization("sync") + return ResourceVersionOut( + release="1.0.0", + status=ResourceVersion.Status.SYNCHRONIZING, + resource_version_id=resource_version.resource_version_id, + resource_id=resource.resource_id, + created_at=0, + ) + + +@router.put("/{rvid}/latest", summary="Set resource version to latest") +@start_as_current_span_async("api_resource_version_set_latest", tracer=tracer) +async def resource_version_latest( + authorization: Authorization, resource: CurrentResource, resource_version: CurrentResourceVersion +) -> ResourceVersionOut: + """ + Set the resource version as the latest version. + + Permission `resource:set_latest` required. + \f + Parameters + ---------- + authorization : Callable[[str], Awaitable[Any]] + Async function to ask the auth service for authorization. Dependency Injection. + resource : clowmdb.models.Resource + Resource associated with the ID in the path. Dependency Injection. + resource_version : clowmdb.models.ResourceVersion + Resource Version associated with the ID in the path. Dependency Injection. + """ + trace.get_current_span().set_attributes( + {"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)} + ) + await authorization("set_latest") + return ResourceVersionOut( + release="1.0.0", + status=ResourceVersion.Status.LATEST, + resource_version_id=resource_version.resource_version_id, + resource_id=resource.resource_id, + created_at=0, + ) + + +@router.delete("/{rvid}/cluster", summary="Delete resource version on cluster") +@start_as_current_span_async("api_resource_version_delete_cluster", tracer=tracer) +async def delete_resource_version_cluster( + authorization: Authorization, resource: CurrentResource, resource_version: CurrentResourceVersion +) -> ResourceVersionOut: + """ + Delete the resource version on the cluster. + + Permission `resource:delete_cluster` required. + \f + Parameters + ---------- + authorization : Callable[[str], Awaitable[Any]] + Async function to ask the auth service for authorization. Dependency Injection. + resource : clowmdb.models.Resource + Resource associated with the ID in the path. Dependency Injection. + resource_version : clowmdb.models.ResourceVersion + Resource Version associated with the ID in the path. Dependency Injection. + """ + trace.get_current_span().set_attributes( + {"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)} + ) + await authorization("delete_cluster") + return ResourceVersionOut( + release="1.0.0", + status=ResourceVersion.Status.CLUSTER_DELETED, + resource_version_id=resource_version.resource_version_id, + resource_id=resource.resource_id, + created_at=0, + ) + + +@router.delete("/{rvid}/s3", summary="Delete resource version in S3") +@start_as_current_span_async("api_resource_version_delete_cluster", tracer=tracer) +async def delete_resource_version_s3( + authorization: Authorization, resource: CurrentResource, resource_version: CurrentResourceVersion +) -> ResourceVersionOut: + """ + Delete the resource version in the S3 bucket. + + Permission `resource:delete_s3` required. + \f + Parameters + ---------- + authorization : Callable[[str], Awaitable[Any]] + Async function to ask the auth service for authorization. Dependency Injection. + resource : clowmdb.models.Resource + Resource associated with the ID in the path. Dependency Injection. + resource_version : clowmdb.models.ResourceVersion + Resource Version associated with the ID in the path. Dependency Injection. + """ + trace.get_current_span().set_attributes( + {"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)} + ) + await authorization("delete_s3") + return ResourceVersionOut( + release="1.0.0", + status=ResourceVersion.Status.S3_DELETED, + resource_version_id=resource_version.resource_version_id, + resource_id=resource.resource_id, + created_at=0, + ) diff --git a/app/api/endpoints/resources.py b/app/api/endpoints/resources.py index 3650f8785ac2b292b347740e48acb395074279da..c0d538001b36d87f552845c7042d5f9b05873272 100644 --- a/app/api/endpoints/resources.py +++ b/app/api/endpoints/resources.py @@ -1,30 +1,185 @@ -from typing import Annotated, Any, Awaitable, Callable +from typing import Annotated, Any, Awaitable, Callable, List, Optional +from uuid import uuid4 -from fastapi import APIRouter, Depends +from clowmdb.models import ResourceVersion +from fastapi import APIRouter, Depends, Query, status from opentelemetry import trace -from app.api.dependencies import AuthorizationDependency +from app.api.dependencies import AuthorizationDependency, CurrentResource, CurrentUser +from app.schemas.resource import ResourceIn, ResourceOut +from app.schemas.resource_version import ResourceVersionOut from app.utils.otlp import start_as_current_span_async -router = APIRouter(prefix="/resources", tags=["Bucket"]) +router = APIRouter(prefix="/resources", tags=["Resource"]) resource_authorization = AuthorizationDependency(resource="resource") Authorization = Annotated[Callable[[str], Awaitable[Any]], Depends(resource_authorization)] tracer = trace.get_tracer_provider().get_tracer(__name__) -@router.get("", summary="List buckets of user") +@router.get("", summary="List resources") @start_as_current_span_async("api_list_resources", tracer=tracer) -async def list_buckets( +async def list_resources( authorization: Authorization, + current_user: CurrentUser, + maintainer_id: Annotated[ + Optional[str], + Query( + max_length=64, + description="Filter for resource by maintainer. If current user is the same as maintainer ID, permission `resource:list` required, otherwise `resource:list_filter`.", + # noqa: E501 + ), + ] = None, + version_status: Annotated[ + Optional[List[ResourceVersion.Status]], + Query( + description=f"Which versions of the resource to include in the response. Permission `resource:list_filter` required, unless `maintainer_id` is provided and current user is maintainer, then only permission `resource:list` required. Default `{ResourceVersion.Status.LATEST.name}` and `{ResourceVersion.Status.SYNCHRONIZED.name}`.", + # noqa: E501 + ), + ] = None, + name_substring: Annotated[Optional[str], Query(max_length=32)] = None, +) -> List[ResourceOut]: + """ + List all resources. + + Permission `resource:list` required. + \f + Parameters + ---------- + authorization : Callable[[str], Awaitable[Any]] + Async function to ask the auth service for authorization. Dependency Injection. + current_user : clowmdb.models.User + Current user. Dependency injection. + maintainer_id : str | None, default None + Filter resources by a maintainer. Query Parameter. + version_status : List[clowmdb.models.ResourceVersion.Status] | None, default None + Filter resource version by their status. Query Parameter. + name_substring : str | None, default None + Filter resources by a substring in their name. Query Parameter. + """ + current_span = trace.get_current_span() + if maintainer_id: # pragma: no cover + current_span.set_attribute("maintainer_id", maintainer_id) + if version_status: # pragma: no cover + current_span.set_attribute("version_status", [state.name for state in version_status]) + if name_substring: # pragma: no cover + current_span.set_attribute("name_substring", name_substring) + + rbac_operation = "list" + if maintainer_id is not None and current_user.uid != maintainer_id: + rbac_operation = "list_filter" + elif version_status is not None and maintainer_id is None: + rbac_operation = "list_filter" + await authorization(rbac_operation) + + return [] + + +@router.post("", summary="Request a new resource", status_code=status.HTTP_201_CREATED) +@start_as_current_span_async("api_request_resource", tracer=tracer) +async def request_resource( + authorization: Authorization, + current_user: CurrentUser, + resource: ResourceIn, +) -> ResourceOut: + """ + Request a new resources. + + Permission `resource:create` required. + \f + Parameters + ---------- + authorization : Callable[[str], Awaitable[Any]] + Async function to ask the auth service for authorization. Dependency Injection. + current_user : clowmdb.models.User + Current user. Dependency injection. + resource : app.schemas.resource.ResourceIn + Data about the new resource. HTTP Body. + """ + current_span = trace.get_current_span() + current_span.set_attribute("resource_in", resource.model_dump_json(indent=2)) + await authorization("create") + rid = uuid4() + return ResourceOut( + name=resource.name, + description=resource.description, + source=resource.source, + resource_id=rid, + maintainer_id=current_user.uid, + versions=[ + ResourceVersionOut( + status=ResourceVersion.Status.RESOURCE_REQUESTED, + resource_version_id=uuid4(), + created_at=0, + resource_id=rid, + release="1.0.0", + ) + ], + ) + + +@router.get("/{rid}", summary="Get a resource") +@start_as_current_span_async("api_get_resource", tracer=tracer) +async def get_resource( + authorization: Authorization, + resource: CurrentResource, + current_user: CurrentUser, + version_status: Annotated[ + Optional[List[ResourceVersion.Status]], + Query( + description=f"Which versions of the resource to include in the response. Permission `resource:read_any` required, unless the current user is the maintainer, then only permission `resource:read` required. Default `{ResourceVersion.Status.LATEST.name}` and `{ResourceVersion.Status.SYNCHRONIZED.name}`.", + # noqa: E501 + ), + ] = None, +) -> ResourceOut: + """ + Get a specific resource. + + Permission `resource:read` required. + \f + Parameters + ---------- + authorization : Callable[[str], Awaitable[Any]] + Async function to ask the auth service for authorization. Dependency Injection. + current_user : clowmdb.models.User + Current user. Dependency injection. + resource : clowmdb.models.Resource + Resource associated with the ID in the path. Dependency Injection. + version_status : List[clowmdb.models.ResourceVersion.Status] | None, default None + Filter resource version by their status. Query Parameter. + """ + current_span = trace.get_current_span() + if version_status: # pragma: no cover + current_span.set_attribute("version_status", [state.name for state in version_status]) + current_span.set_attribute("resource_id", str(resource.resource_id)) + rbac_operation = "read_any" if resource.maintainer_id != current_user.uid and version_status is not None else "read" + await authorization(rbac_operation) + return ResourceOut( + name="name", + description="a" * 16, + source="a" * 8, + resource_id=resource.resource_id, + maintainer_id=current_user.uid, + versions=[], + ) + + +@router.delete("/{rid}", summary="Delete a resource", status_code=status.HTTP_204_NO_CONTENT) +@start_as_current_span_async("api_delete_resource", tracer=tracer) +async def delete_resource( + authorization: Authorization, + resource: CurrentResource, ) -> None: """ - List all the buckets in the system or of the desired user where the user has READ permissions for.\n - Permission "resource:read" required. + Delete a resources. + + Permission `resource:delete` required. \f Parameters ---------- authorization : Callable[[str], Awaitable[Any]] Async function to ask the auth service for authorization. Dependency Injection. + resource : clowmdb.models.Resource + Resource associated with the ID in the path. Dependency Injection. """ - await authorization("read") - pass + trace.get_current_span().set_attribute("resource_id", str(resource.resource_id)) + await authorization("delete") diff --git a/app/api/utils.py b/app/api/utils.py index 8ac760c16062c395767825b0925f38d5727315c8..f5ce50aca23b53cb83a0737365d05493f6541666 100644 --- a/app/api/utils.py +++ b/app/api/utils.py @@ -1,6 +1,4 @@ -import re from asyncio import sleep as async_sleep -from typing import TYPE_CHECKING from httpx import HTTPError from mako.template import Template @@ -11,27 +9,17 @@ from app.core.config import settings from app.slurm import SlurmClient, SlurmJobSubmission from app.utils.backoff_strategy import ExponentialBackoff -if TYPE_CHECKING: - from types_aiobotocore_s3.service_resource import S3ServiceResource -else: - S3ServiceResource = object - -nextflow_command_template = Template(filename="mako_templates/nextflow_command.tmpl") -# regex to find S3 files in parameters of workflow execution -s3_file_regex = re.compile( - r"s3://(?!(((2(5[0-5]|[0-4]\d)|[01]?\d{1,2})\.){3}(2(5[0-5]|[0-4]\d)|[01]?\d{1,2})$))[a-z\d][a-z\d.-]{1,61}[a-z\d][^\"]*" -) +synchronization_script_template = Template(filename="app/mako_templates/synchronize_resource_version.sh.tmpl") tracer = trace.get_tracer_provider().get_tracer(__name__) -async def start_workflow_execution( - s3: S3ServiceResource, +async def synchronize_resource( db: AsyncSession, slurm_client: SlurmClient, ) -> None: """ - Start a workflow on the Slurm cluster. + Synchronize a resource to the cluster Parameters ---------- @@ -41,12 +29,11 @@ async def start_workflow_execution( Slurm Rest Client to communicate with Slurm cluster. """ - nextflow_script = nextflow_command_template.render() + synchronization_script = synchronization_script_template.render() - # Setup env for the workflow execution try: job_submission = SlurmJobSubmission( - script=nextflow_script.strip(), + script=synchronization_script.strip(), job={ "current_working_directory": settings.SLURM_WORKING_DIRECTORY, "name": "somename", diff --git a/app/core/config.py b/app/core/config.py index 811a9eb71e84ffa61f72ab95e426873b32b43fe3..3cfd655948c46ca4033f3e2e07b7a993800ab5a7 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -99,6 +99,12 @@ class Settings(BaseSettings): SLURM_WORKING_DIRECTORY: str = Field( "/tmp", description="Working directory for the slurm job with the nextflow command" ) + RESOURCE_CLUSTER_PATH: str = Field( + "/vol/data/databases", description="Base path on the cluster where all resources are saved" + ) + RESOURCE_CONTAINER_PATH: str = Field( + "/vol/resources", description="Base path in the container where all resources are available" + ) OTLP_GRPC_ENDPOINT: Optional[str] = Field( None, description="OTLP compatible endpoint to send traces via gRPC, e.g. Jaeger" diff --git a/mako_templates/nextflow_command.tmpl b/app/mako_templates/delete_resource.sh.tmpl similarity index 100% rename from mako_templates/nextflow_command.tmpl rename to app/mako_templates/delete_resource.sh.tmpl diff --git a/app/mako_templates/delete_resource_version.sh.tmpl b/app/mako_templates/delete_resource_version.sh.tmpl new file mode 100644 index 0000000000000000000000000000000000000000..a9bf588e2f88457fdf73ac7361ef1d596fb81453 --- /dev/null +++ b/app/mako_templates/delete_resource_version.sh.tmpl @@ -0,0 +1 @@ +#!/bin/bash diff --git a/app/mako_templates/set_latest_resource_version.sh.tmpl b/app/mako_templates/set_latest_resource_version.sh.tmpl new file mode 100644 index 0000000000000000000000000000000000000000..a9bf588e2f88457fdf73ac7361ef1d596fb81453 --- /dev/null +++ b/app/mako_templates/set_latest_resource_version.sh.tmpl @@ -0,0 +1 @@ +#!/bin/bash diff --git a/app/mako_templates/synchronize_resource_version.sh.tmpl b/app/mako_templates/synchronize_resource_version.sh.tmpl new file mode 100644 index 0000000000000000000000000000000000000000..a9bf588e2f88457fdf73ac7361ef1d596fb81453 --- /dev/null +++ b/app/mako_templates/synchronize_resource_version.sh.tmpl @@ -0,0 +1 @@ +#!/bin/bash diff --git a/app/schemas/resource.py b/app/schemas/resource.py new file mode 100644 index 0000000000000000000000000000000000000000..7ed9c869085b46b42cc4e8b846368ad459aadf99 --- /dev/null +++ b/app/schemas/resource.py @@ -0,0 +1,34 @@ +from typing import List +from uuid import UUID + +from pydantic import BaseModel, Field + +from app.schemas.resource_version import ResourceVersionIn, ResourceVersionOut + + +class BaseResource(BaseModel): + name: str = Field(..., description="Short Name for the resource", min_length=3, max_length=32, examples=["blastdb"]) + description: str = Field( + ..., + description="Short description for this resource", + min_length=16, + max_length=264, + examples=["This is a short description for a resource"], + ) + source: str = Field( + ..., + description="A link or similar where the resource originates from", + min_length=8, + max_length=264, + examples=["https://example.com/db"], + ) + + +class ResourceIn(BaseResource, ResourceVersionIn): + pass + + +class ResourceOut(BaseResource): + resource_id: UUID = Field(..., description="ID of the resource", examples=["4c072e39-2bd9-4fa3-b564-4d890e240ccd"]) + maintainer_id: str = Field(..., description="ID of the maintainer", examples=["28c5353b8bb34984a8bd4169ba94c606"]) + versions: List[ResourceVersionOut] = Field(..., description="Versions of the resource") diff --git a/app/schemas/resource_version.py b/app/schemas/resource_version.py new file mode 100644 index 0000000000000000000000000000000000000000..57ac8095e54f4185899dd28d5c0461155654e0be --- /dev/null +++ b/app/schemas/resource_version.py @@ -0,0 +1,56 @@ +from typing import Optional +from uuid import UUID + +from clowmdb.models import ResourceVersion +from pydantic import BaseModel, Field, computed_field + +from app.core.config import settings + + +class BaseResourceVersion(BaseModel): + release: str = Field( + ..., + description="Short tag describing the version of the resource", + examples=["01-2023"], + min_length=3, + max_length=32, + ) + + +class ResourceVersionIn(BaseResourceVersion): + pass + + +class ResourceVersionOut(BaseResourceVersion): + status: ResourceVersion.Status = Field(..., description="Status of the resource version") + resource_version_id: UUID = Field( + ..., description="ID of the resource version", examples=["fb4cee12-1e91-49f3-905f-808845c7c1f4"] + ) + resource_id: UUID = Field(..., description="ID of the resource", examples=["4c072e39-2bd9-4fa3-b564-4d890e240ccd"]) + + created_at: int = Field( + ..., description="Timestamp when the version was created as UNIX timestamp", examples=[1672527600] + ) + + @computed_field( # type: ignore[misc] + description="Path to the resource on the cluster if the resource is synchronized", + examples=[ + "/vol/resources/CLDB-4c072e39/fb4cee121e9149f3905f808845c7c1f4", + "/vol/resources/CLDB-4c072e39/latest", + ], + ) + @property + def cluster_path(self) -> Optional[str]: + if self.status not in [ResourceVersion.Status.LATEST, ResourceVersion.Status.SYNCHRONIZED]: + return None + return f"{settings.RESOURCE_CONTAINER_PATH}/CLDB-{self.resource_id.hex[:8]}/{self.resource_version_id.hex if self.status != ResourceVersion.Status.LATEST else 'latest'}" + + @computed_field( # type: ignore[misc] + description="Path to the resource in the S3 Bucket. Not publicly available.", + examples=[ + "s3://clowm-resources/CLDB-4c072e39/fb4cee121e9149f3905f808845c7c1f4/resource.tar.gz", + ], + ) + @property + def s3_path(self) -> str: + return f"s3://{settings.RESOURCE_BUCKET}/CLDB-{self.resource_id.hex[:8]}/{self.resource_version_id.hex}/resource.tar.gz" diff --git a/app/tests/api/test_resource.py b/app/tests/api/test_resource.py new file mode 100644 index 0000000000000000000000000000000000000000..a0dc4c0973f85f4d1be788e888de038110bb8b18 --- /dev/null +++ b/app/tests/api/test_resource.py @@ -0,0 +1,100 @@ +import pytest +from clowmdb.models import Resource +from fastapi import status +from httpx import AsyncClient + +from app.schemas.resource import ResourceIn +from app.tests.utils.user import UserWithAuthHeader +from app.tests.utils.utils import random_lower_string + + +class _TestResourceRoutes: + base_path: str = "/resources" + + +class TestResourceRouteCreate(_TestResourceRoutes): + @pytest.mark.asyncio + async def test_create_resource_route(self, client: AsyncClient, random_user: UserWithAuthHeader) -> None: + """ + Test for creating a new resource. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. + """ + resource = ResourceIn( + release=random_lower_string(6), + name=random_lower_string(8), + description=random_lower_string(16), + source=random_lower_string(8), + ) + response = await client.post(self.base_path, json=resource.model_dump(), headers=random_user.auth_headers) + assert response.status_code == status.HTTP_201_CREATED + + +class TestResourceRouteDelete(_TestResourceRoutes): + @pytest.mark.asyncio + async def test_delete_resource_route( + self, client: AsyncClient, random_resource: Resource, random_user: UserWithAuthHeader + ) -> None: + """ + Test for deleting a resource. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. + random_resource : clowmdb.models.Resource + Random resource for testing. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. + """ + response = await client.delete( + f"{self.base_path}/{str(random_resource.resource_id)}", headers=random_user.auth_headers + ) + assert response.status_code == status.HTTP_204_NO_CONTENT + + +class TestResourceRouteGet(_TestResourceRoutes): + @pytest.mark.asyncio + async def test_list_resources_route( + self, client: AsyncClient, random_resource: Resource, random_user: UserWithAuthHeader + ) -> None: + """ + Test for listing all resources. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. + random_resource : clowmdb.models.Resource + Random resource for testing. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. + """ + response = await client.get(self.base_path, headers=random_user.auth_headers) + assert response.status_code == status.HTTP_200_OK + + @pytest.mark.asyncio + async def test_get_resource_route( + self, client: AsyncClient, random_resource: Resource, random_user: UserWithAuthHeader + ) -> None: + """ + Test for getting a specific resource. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. + random_resource : clowmdb.models.Resource + Random resource for testing. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. + """ + response = await client.get( + f"{self.base_path}/{str(random_resource.resource_id)}", headers=random_user.auth_headers + ) + assert response.status_code == status.HTTP_200_OK diff --git a/app/tests/api/test_resource_version.py b/app/tests/api/test_resource_version.py new file mode 100644 index 0000000000000000000000000000000000000000..f2c87a18f126c76135a54477ddb0eea1c4c01f83 --- /dev/null +++ b/app/tests/api/test_resource_version.py @@ -0,0 +1,284 @@ +import pytest +from clowmdb.models import Resource, ResourceVersion +from fastapi import status +from httpx import AsyncClient + +from app.schemas.resource_version import ResourceVersionIn +from app.tests.utils.user import UserWithAuthHeader +from app.tests.utils.utils import random_lower_string + + +class _TestResourceVersionRoutes: + base_path: str = "/resources" + + +class TestResourceVersionRouteCreate(_TestResourceVersionRoutes): + @pytest.mark.asyncio + async def test_create_resource_route( + self, client: AsyncClient, random_user: UserWithAuthHeader, random_resource: Resource + ) -> None: + """ + Test for creating a new resource version. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. + """ + resource_version = ResourceVersionIn(release=random_lower_string(6)) + response = await client.post( + f"{self.base_path}/{str(random_resource.resource_id)}/versions", + json=resource_version.model_dump(), + headers=random_user.auth_headers, + ) + assert response.status_code == status.HTTP_201_CREATED + + +class TestResourceVersionRouteGet(_TestResourceVersionRoutes): + @pytest.mark.asyncio + async def test_list_resource_versions_route( + self, + client: AsyncClient, + random_resource: Resource, + random_resource_version: ResourceVersion, + random_user: UserWithAuthHeader, + ) -> None: + """ + Test for listing all resource versions of a resource. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. + random_resource : clowmdb.models.Resource + Random resource for testing. + random_resource_version : clowmdb.models.Resource + Random resource version for testing. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. + """ + response = await client.get( + "/".join([self.base_path, str(random_resource.resource_id), "versions"]), headers=random_user.auth_headers + ) + assert response.status_code == status.HTTP_200_OK + + @pytest.mark.asyncio + async def test_get_resource_version_route( + self, + client: AsyncClient, + random_resource: Resource, + random_resource_version: ResourceVersion, + random_user: UserWithAuthHeader, + ) -> None: + """ + Test for getting a specif resource version of a resource. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. + random_resource : clowmdb.models.Resource + Random resource for testing. + random_resource_version : clowmdb.models.Resource + Random resource version for testing. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. + """ + response = await client.get( + "/".join( + [ + self.base_path, + str(random_resource.resource_id), + "versions", + str(random_resource_version.resource_version_id), + ] + ), + headers=random_user.auth_headers, + ) + assert response.status_code == status.HTTP_200_OK + + +class TestResourceVersionRouteDelete(_TestResourceVersionRoutes): + @pytest.mark.asyncio + async def test_delete_resource_version_cluster_route( + self, + client: AsyncClient, + random_resource: Resource, + random_resource_version: ResourceVersion, + random_user: UserWithAuthHeader, + ) -> None: + """ + Test for deleting a resource version on the cluster. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. + random_resource : clowmdb.models.Resource + Random resource for testing. + random_resource_version : clowmdb.models.Resource + Random resource version for testing. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. + """ + response = await client.delete( + "/".join( + [ + self.base_path, + str(random_resource.resource_id), + "versions", + str(random_resource_version.resource_version_id), + "cluster", + ] + ), + headers=random_user.auth_headers, + ) + assert response.status_code == status.HTTP_200_OK + + @pytest.mark.asyncio + async def test_delete_resource_version_s3_route( + self, + client: AsyncClient, + random_resource: Resource, + random_resource_version: ResourceVersion, + random_user: UserWithAuthHeader, + ) -> None: + """ + Test for deleting a resource version in the S3 Bucket + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. + random_resource : clowmdb.models.Resource + Random resource for testing. + random_resource_version : clowmdb.models.Resource + Random resource version for testing. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. + """ + response = await client.delete( + "/".join( + [ + self.base_path, + str(random_resource.resource_id), + "versions", + str(random_resource_version.resource_version_id), + "s3", + ] + ), + headers=random_user.auth_headers, + ) + assert response.status_code == status.HTTP_200_OK + + +class TestResourceVersionRoutePut(_TestResourceVersionRoutes): + @pytest.mark.asyncio + async def test_request_sync_resource_version_route( + self, + client: AsyncClient, + random_resource: Resource, + random_resource_version: ResourceVersion, + random_user: UserWithAuthHeader, + ) -> None: + """ + Test for requesting a synchronization of the resource version to the cluster. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. + random_resource : clowmdb.models.Resource + Random resource for testing. + random_resource_version : clowmdb.models.Resource + Random resource version for testing. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. + """ + response = await client.put( + "/".join( + [ + self.base_path, + str(random_resource.resource_id), + "versions", + str(random_resource_version.resource_version_id), + "request_sync", + ] + ), + headers=random_user.auth_headers, + ) + assert response.status_code == status.HTTP_200_OK + + @pytest.mark.asyncio + async def test_sync_resource_version_route( + self, + client: AsyncClient, + random_resource: Resource, + random_resource_version: ResourceVersion, + random_user: UserWithAuthHeader, + ) -> None: + """ + Test for synchronizing a resource version to the cluster. + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. + random_resource : clowmdb.models.Resource + Random resource for testing. + random_resource_version : clowmdb.models.Resource + Random resource version for testing. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. + """ + response = await client.put( + "/".join( + [ + self.base_path, + str(random_resource.resource_id), + "versions", + str(random_resource_version.resource_version_id), + "sync", + ] + ), + headers=random_user.auth_headers, + ) + assert response.status_code == status.HTTP_200_OK + + @pytest.mark.asyncio + async def test_set_latest_resource_version_route( + self, + client: AsyncClient, + random_resource: Resource, + random_resource_version: ResourceVersion, + random_user: UserWithAuthHeader, + ) -> None: + """ + Test for setting a resource version as the latest version + + Parameters + ---------- + client : httpx.AsyncClient + HTTP Client to perform the request on. + random_resource : clowmdb.models.Resource + Random resource for testing. + random_resource_version : clowmdb.models.Resource + Random resource version for testing. + random_user : app.tests.utils.user.UserWithAuthHeader + Random user for testing. + """ + response = await client.put( + "/".join( + [ + self.base_path, + str(random_resource.resource_id), + "versions", + str(random_resource_version.resource_version_id), + "latest", + ] + ), + headers=random_user.auth_headers, + ) + assert response.status_code == status.HTTP_200_OK diff --git a/app/tests/conftest.py b/app/tests/conftest.py index c746718ce55fab51ce46a7b748ad70bb13f8ce58..c52da4a18d588860b5759983ba82821f9ef5a22f 100644 --- a/app/tests/conftest.py +++ b/app/tests/conftest.py @@ -2,11 +2,13 @@ import asyncio from functools import partial from secrets import token_urlsafe from typing import AsyncGenerator, AsyncIterator, Dict, Iterator +from uuid import uuid4 import httpx import pytest import pytest_asyncio from clowmdb.db.session import get_async_session +from clowmdb.models import Resource, ResourceVersion from pytrie import SortedStringTrie as Trie from sqlalchemy.ext.asyncio import AsyncSession @@ -18,6 +20,7 @@ from app.tests.mocks.mock_opa_service import MockOpaService from app.tests.mocks.mock_s3_resource import MockS3ServiceResource from app.tests.mocks.mock_slurm_cluster import MockSlurmCluster from app.tests.utils.user import UserWithAuthHeader, create_random_user, decode_mock_token, get_authorization_headers +from app.tests.utils.utils import random_lower_string jwt_secret = token_urlsafe(32) @@ -162,3 +165,30 @@ async def random_third_user(db: AsyncSession, mock_opa_service: MockOpaService) mock_opa_service.delete_user(user.uid) await db.delete(user) await db.commit() + + +@pytest_asyncio.fixture(scope="function") +async def random_resource(db: AsyncSession, random_user: UserWithAuthHeader) -> AsyncIterator[Resource]: + """ + Create a random resource and deletes it afterward. + """ + yield Resource( + _resource_id=uuid4().bytes, + name=random_lower_string(8), + short_description=random_lower_string(32), + source=random_lower_string(32), + maintainer_id=random_user.user.uid, + ) + + +@pytest_asyncio.fixture(scope="function") +async def random_resource_version(db: AsyncSession, random_resource: Resource) -> AsyncIterator[ResourceVersion]: + """ + Create a random resource version and deletes it afterward. + """ + yield ResourceVersion( + _resource_version_id=uuid4().bytes, + _resource_id=random_resource.resource_id.bytes, + release=random_lower_string(8), + status=ResourceVersion.Status.RESOURCE_REQUESTED, + )