Skip to content
Snippets Groups Projects
Verified Commit 59ac82a1 authored by Daniel Göbel's avatar Daniel Göbel
Browse files

Add CRUD repositories and tests for resource(version)

#2
parent 007f379b
No related branches found
No related tags found
1 merge request!3Resolve "Add CRUD Repository for resources"
Pipeline #41755 failed
Showing
with 922 additions and 135 deletions
...@@ -137,7 +137,7 @@ publish-dev-docker-container-job: ...@@ -137,7 +137,7 @@ publish-dev-docker-container-job:
dependencies: [] dependencies: []
only: only:
refs: refs:
- development - main
before_script: before_script:
- echo "{\"auths\":{\"${CI_REGISTRY}\":{\"auth\":\"$(printf "%s:%s" "${CI_REGISTRY_USER}" "${CI_REGISTRY_PASSWORD}" | base64 | tr -d '\n')\"},\"$CI_DEPENDENCY_PROXY_SERVER\":{\"auth\":\"$(printf "%s:%s" ${CI_DEPENDENCY_PROXY_USER} "${CI_DEPENDENCY_PROXY_PASSWORD}" | base64 | tr -d '\n')\"}}}" > /kaniko/.docker/config.json - echo "{\"auths\":{\"${CI_REGISTRY}\":{\"auth\":\"$(printf "%s:%s" "${CI_REGISTRY_USER}" "${CI_REGISTRY_PASSWORD}" | base64 | tr -d '\n')\"},\"$CI_DEPENDENCY_PROXY_SERVER\":{\"auth\":\"$(printf "%s:%s" ${CI_DEPENDENCY_PROXY_USER} "${CI_DEPENDENCY_PROXY_PASSWORD}" | base64 | tr -d '\n')\"}}}" > /kaniko/.docker/config.json
script: script:
......
...@@ -15,7 +15,7 @@ repos: ...@@ -15,7 +15,7 @@ repos:
- id: check-merge-conflict - id: check-merge-conflict
- id: check-ast - id: check-ast
- repo: https://github.com/psf/black - repo: https://github.com/psf/black
rev: 23.11.0 rev: 23.12.0
hooks: hooks:
- id: black - id: black
files: app files: app
...@@ -25,7 +25,7 @@ repos: ...@@ -25,7 +25,7 @@ repos:
hooks: hooks:
- id: ruff - id: ruff
- repo: https://github.com/PyCQA/isort - repo: https://github.com/PyCQA/isort
rev: 5.12.0 rev: 5.13.1
hooks: hooks:
- id: isort - id: isort
files: app files: app
......
from typing import TYPE_CHECKING, Annotated, AsyncIterator, Awaitable, Callable, Dict from typing import TYPE_CHECKING, Annotated, AsyncIterator, Awaitable, Callable, Dict, List
from uuid import UUID from uuid import UUID
from authlib.jose.errors import BadSignatureError, DecodeError, ExpiredTokenError from authlib.jose.errors import BadSignatureError, DecodeError, ExpiredTokenError
...@@ -13,7 +13,7 @@ from sqlalchemy.ext.asyncio import AsyncSession ...@@ -13,7 +13,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings from app.core.config import settings
from app.core.security import decode_token, request_authorization from app.core.security import decode_token, request_authorization
from app.crud import CRUDUser from app.crud import CRUDResource, CRUDUser
from app.schemas.security import JWT, AuthzRequest, AuthzResponse from app.schemas.security import JWT, AuthzRequest, AuthzResponse
from app.slurm.rest_client import SlurmClient from app.slurm.rest_client import SlurmClient
from app.utils.otlp import start_as_current_span_async from app.utils.otlp import start_as_current_span_async
...@@ -28,7 +28,7 @@ bearer_token = HTTPBearer(description="JWT Header") ...@@ -28,7 +28,7 @@ bearer_token = HTTPBearer(description="JWT Header")
tracer = trace.get_tracer_provider().get_tracer(__name__) tracer = trace.get_tracer_provider().get_tracer(__name__)
async def get_s3_resource(request: Request) -> S3ServiceResource: async def get_s3_resource(request: Request) -> S3ServiceResource: # pragma: no cover
return request.app.s3_resource return request.app.s3_resource
...@@ -205,17 +205,16 @@ async def get_current_resource(rid: Annotated[UUID, Path()], db: DBSession) -> R ...@@ -205,17 +205,16 @@ async def get_current_resource(rid: Annotated[UUID, Path()], db: DBSession) -> R
resource : clowmdb.models.Resource resource : clowmdb.models.Resource
Resource associated with the ID in the path. Resource associated with the ID in the path.
""" """
return Resource( resource = await CRUDResource.get(db, resource_id=rid)
name="name", _resource_id=rid.bytes, short_description="a" * 32, source="a" * 8, maintainer_id="abc" if resource:
) return resource
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Resource not found")
CurrentResource = Annotated[Resource, Depends(get_current_resource)] CurrentResource = Annotated[Resource, Depends(get_current_resource)]
async def get_current_resource_version( async def get_current_resource_version(rvid: Annotated[UUID, Path()], resource: CurrentResource) -> ResourceVersion:
rvid: Annotated[UUID, Path()], resource: CurrentResource, db: DBSession
) -> ResourceVersion:
""" """
Get the current resource version from the database based on the ID in the path. Get the current resource version from the database based on the ID in the path.
...@@ -227,20 +226,18 @@ async def get_current_resource_version( ...@@ -227,20 +226,18 @@ async def get_current_resource_version(
ID of a resource version. Path parameter. ID of a resource version. Path parameter.
resource : clowmdb.models.Resource resource : clowmdb.models.Resource
Resource associated with the ID in the path. Resource associated with the ID in the path.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
Returns Returns
------- -------
resource_version : clowmdb.models.ResourceVersion resource_version : clowmdb.models.ResourceVersion
Resource Version associated with the ID in the path. Resource Version associated with the ID in the path.
""" """
return ResourceVersion( resource_version: List[ResourceVersion] = [
_resource_version_id=rvid.bytes, version for version in resource.versions if version.resource_version_id == rvid
_resource_id=resource.resource_id.bytes, ]
release="1.0.0", if len(resource_version) > 0:
status=ResourceVersion.Status.RESOURCE_REQUESTED, return resource_version[0]
) raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Resource version not found")
CurrentResourceVersion = Annotated[ResourceVersion, Depends(get_current_resource_version)] CurrentResourceVersion = Annotated[ResourceVersion, Depends(get_current_resource_version)]
from typing import Annotated, Any, Awaitable, Callable, List, Optional from typing import Annotated, Any, Awaitable, Callable, List, Optional
from uuid import uuid4
from clowmdb.models import ResourceVersion from clowmdb.models import ResourceVersion
from fastapi import APIRouter, Depends, Query, status from fastapi import APIRouter, Depends, Query, status
from opentelemetry import trace from opentelemetry import trace
from app.api.dependencies import AuthorizationDependency, CurrentResource, CurrentResourceVersion, CurrentUser from app.api.dependencies import (
AuthorizationDependency,
CurrentResource,
CurrentResourceVersion,
CurrentUser,
DBSession,
)
from app.crud import CRUDResourceVersion
from app.schemas.resource_version import ResourceVersionIn, ResourceVersionOut from app.schemas.resource_version import ResourceVersionIn, ResourceVersionOut
from app.utils.otlp import start_as_current_span_async from app.utils.otlp import start_as_current_span_async
...@@ -53,8 +59,14 @@ async def list_resource_versions( ...@@ -53,8 +59,14 @@ async def list_resource_versions(
"list_filter" if resource.maintainer_id != current_user.uid and version_status is not None else "list" "list_filter" if resource.maintainer_id != current_user.uid and version_status is not None else "list"
) )
await authorization(rbac_operation) await authorization(rbac_operation)
requested_versions = (
return [] version_status if version_status else [ResourceVersion.Status.LATEST, ResourceVersion.Status.SYNCHRONIZED]
)
return [
ResourceVersionOut.from_db_resource_version(version)
for version in resource.versions
if version.status in requested_versions
]
@router.post("", summary="Request new version of a resource", status_code=status.HTTP_201_CREATED) @router.post("", summary="Request new version of a resource", status_code=status.HTTP_201_CREATED)
...@@ -62,8 +74,9 @@ async def list_resource_versions( ...@@ -62,8 +74,9 @@ async def list_resource_versions(
async def request_resource_version( async def request_resource_version(
authorization: Authorization, authorization: Authorization,
resource: CurrentResource, resource: CurrentResource,
resource_version: ResourceVersionIn, resource_version_in: ResourceVersionIn,
current_user: CurrentUser, current_user: CurrentUser,
db: DBSession,
) -> ResourceVersionOut: ) -> ResourceVersionOut:
""" """
Request a new resource version. Request a new resource version.
...@@ -76,26 +89,25 @@ async def request_resource_version( ...@@ -76,26 +89,25 @@ async def request_resource_version(
Async function to ask the auth service for authorization. Dependency Injection. Async function to ask the auth service for authorization. Dependency Injection.
resource : clowmdb.models.Resource resource : clowmdb.models.Resource
Resource associated with the ID in the path. Dependency Injection. Resource associated with the ID in the path. Dependency Injection.
resource_version : app.schemas.resource_version.ResourceVersionIn resource_version_in : app.schemas.resource_version.ResourceVersionIn
Data about the new resource version. HTTP Body. Data about the new resource version. HTTP Body.
current_user : clowmdb.models.User current_user : clowmdb.models.User
Current user. Dependency injection. Current user. Dependency injection.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
""" """
current_span = trace.get_current_span() current_span = trace.get_current_span()
current_span.set_attributes( current_span.set_attributes(
{"resource_id": str(resource.resource_id), "resource_version_in": resource_version.model_dump_json(indent=2)} {"resource_id": str(resource.resource_id), "resource_version_in": resource_version_in.model_dump_json(indent=2)}
) )
rbac_operation = "update" rbac_operation = "update"
if current_user.uid != resource.maintainer_id: if current_user.uid != resource.maintainer_id:
rbac_operation = "update_any" rbac_operation = "update_any"
await authorization(rbac_operation) await authorization(rbac_operation)
return ResourceVersionOut( resource_version = await CRUDResourceVersion.create(
release="1.0.0", db, resource_id=resource.resource_id, release=resource_version_in.release
status=ResourceVersion.Status.RESOURCE_REQUESTED,
resource_version_id=uuid4(),
resource_id=resource.resource_id,
created_at=0,
) )
return ResourceVersionOut.from_db_resource_version(resource_version)
@router.get("/{rvid}", summary="Get version of a resource") @router.get("/{rvid}", summary="Get version of a resource")
...@@ -134,13 +146,7 @@ async def get_resource_version( ...@@ -134,13 +146,7 @@ async def get_resource_version(
]: ]:
rbac_operation = "read_any" rbac_operation = "read_any"
await authorization(rbac_operation) await authorization(rbac_operation)
return ResourceVersionOut( return ResourceVersionOut.from_db_resource_version(resource_version)
release="1.0.0",
status=ResourceVersion.Status.RESOURCE_REQUESTED,
resource_version_id=resource_version.resource_version_id,
resource_id=resource.resource_id,
created_at=0,
)
@router.put("/{rvid}/request_sync", summary="Request resource version synchronization") @router.put("/{rvid}/request_sync", summary="Request resource version synchronization")
...@@ -150,6 +156,7 @@ async def request_resource_version_sync( ...@@ -150,6 +156,7 @@ async def request_resource_version_sync(
resource: CurrentResource, resource: CurrentResource,
resource_version: CurrentResourceVersion, resource_version: CurrentResourceVersion,
current_user: CurrentUser, current_user: CurrentUser,
db: DBSession,
) -> ResourceVersionOut: ) -> ResourceVersionOut:
""" """
Request the synchronization of a resource version to the cluster. Request the synchronization of a resource version to the cluster.
...@@ -166,6 +173,8 @@ async def request_resource_version_sync( ...@@ -166,6 +173,8 @@ async def request_resource_version_sync(
Current user. Dependency injection. Current user. Dependency injection.
resource_version : clowmdb.models.ResourceVersion resource_version : clowmdb.models.ResourceVersion
Resource Version associated with the ID in the path. Dependency Injection. Resource Version associated with the ID in the path. Dependency Injection.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
""" """
trace.get_current_span().set_attributes( trace.get_current_span().set_attributes(
{"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)} {"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)}
...@@ -174,19 +183,20 @@ async def request_resource_version_sync( ...@@ -174,19 +183,20 @@ async def request_resource_version_sync(
if current_user.uid != resource.maintainer_id: if current_user.uid != resource.maintainer_id:
rbac_operation = "request_sync_any" rbac_operation = "request_sync_any"
await authorization(rbac_operation) await authorization(rbac_operation)
return ResourceVersionOut( await CRUDResourceVersion.update_status(
release="1.0.0", db, resource_version_id=resource_version.resource_version_id, status=ResourceVersion.Status.SYNC_REQUESTED
status=ResourceVersion.Status.SYNC_REQUESTED,
resource_version_id=resource_version.resource_version_id,
resource_id=resource.resource_id,
created_at=0,
) )
resource_version.status = ResourceVersion.Status.SYNC_REQUESTED
return ResourceVersionOut.from_db_resource_version(resource_version)
@router.put("/{rvid}/sync", summary="Synchronize resource version with cluster") @router.put("/{rvid}/sync", summary="Synchronize resource version with cluster")
@start_as_current_span_async("api_resource_version_sync", tracer=tracer) @start_as_current_span_async("api_resource_version_sync", tracer=tracer)
async def resource_version_sync( async def resource_version_sync(
authorization: Authorization, resource: CurrentResource, resource_version: CurrentResourceVersion authorization: Authorization,
resource: CurrentResource,
resource_version: CurrentResourceVersion,
db: DBSession,
) -> ResourceVersionOut: ) -> ResourceVersionOut:
""" """
Synchronize the resource version to the cluster. Synchronize the resource version to the cluster.
...@@ -201,18 +211,18 @@ async def resource_version_sync( ...@@ -201,18 +211,18 @@ async def resource_version_sync(
Resource associated with the ID in the path. Dependency Injection. Resource associated with the ID in the path. Dependency Injection.
resource_version : clowmdb.models.ResourceVersion resource_version : clowmdb.models.ResourceVersion
Resource Version associated with the ID in the path. Dependency Injection. Resource Version associated with the ID in the path. Dependency Injection.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
""" """
trace.get_current_span().set_attributes( trace.get_current_span().set_attributes(
{"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)} {"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)}
) )
await authorization("sync") await authorization("sync")
return ResourceVersionOut( await CRUDResourceVersion.update_status(
release="1.0.0", db, resource_version_id=resource_version.resource_version_id, status=ResourceVersion.Status.SYNCHRONIZING
status=ResourceVersion.Status.SYNCHRONIZING,
resource_version_id=resource_version.resource_version_id,
resource_id=resource.resource_id,
created_at=0,
) )
resource_version.status = ResourceVersion.Status.SYNCHRONIZING
return ResourceVersionOut.from_db_resource_version(resource_version)
@router.put("/{rvid}/latest", summary="Set resource version to latest") @router.put("/{rvid}/latest", summary="Set resource version to latest")
...@@ -238,19 +248,16 @@ async def resource_version_latest( ...@@ -238,19 +248,16 @@ async def resource_version_latest(
{"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)} {"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)}
) )
await authorization("set_latest") await authorization("set_latest")
return ResourceVersionOut( return resource_version
release="1.0.0",
status=ResourceVersion.Status.LATEST,
resource_version_id=resource_version.resource_version_id,
resource_id=resource.resource_id,
created_at=0,
)
@router.delete("/{rvid}/cluster", summary="Delete resource version on cluster") @router.delete("/{rvid}/cluster", summary="Delete resource version on cluster")
@start_as_current_span_async("api_resource_version_delete_cluster", tracer=tracer) @start_as_current_span_async("api_resource_version_delete_cluster", tracer=tracer)
async def delete_resource_version_cluster( async def delete_resource_version_cluster(
authorization: Authorization, resource: CurrentResource, resource_version: CurrentResourceVersion authorization: Authorization,
resource: CurrentResource,
resource_version: CurrentResourceVersion,
db: DBSession,
) -> ResourceVersionOut: ) -> ResourceVersionOut:
""" """
Delete the resource version on the cluster. Delete the resource version on the cluster.
...@@ -265,24 +272,27 @@ async def delete_resource_version_cluster( ...@@ -265,24 +272,27 @@ async def delete_resource_version_cluster(
Resource associated with the ID in the path. Dependency Injection. Resource associated with the ID in the path. Dependency Injection.
resource_version : clowmdb.models.ResourceVersion resource_version : clowmdb.models.ResourceVersion
Resource Version associated with the ID in the path. Dependency Injection. Resource Version associated with the ID in the path. Dependency Injection.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
""" """
trace.get_current_span().set_attributes( trace.get_current_span().set_attributes(
{"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)} {"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)}
) )
await authorization("delete_cluster") await authorization("delete_cluster")
return ResourceVersionOut( await CRUDResourceVersion.update_status(
release="1.0.0", db, resource_version_id=resource_version.resource_version_id, status=ResourceVersion.Status.CLUSTER_DELETED
status=ResourceVersion.Status.CLUSTER_DELETED,
resource_version_id=resource_version.resource_version_id,
resource_id=resource.resource_id,
created_at=0,
) )
resource_version.status = ResourceVersion.Status.CLUSTER_DELETED
return ResourceVersionOut.from_db_resource_version(resource_version)
@router.delete("/{rvid}/s3", summary="Delete resource version in S3") @router.delete("/{rvid}/s3", summary="Delete resource version in S3")
@start_as_current_span_async("api_resource_version_delete_cluster", tracer=tracer) @start_as_current_span_async("api_resource_version_delete_cluster", tracer=tracer)
async def delete_resource_version_s3( async def delete_resource_version_s3(
authorization: Authorization, resource: CurrentResource, resource_version: CurrentResourceVersion authorization: Authorization,
resource: CurrentResource,
resource_version: CurrentResourceVersion,
db: DBSession,
) -> ResourceVersionOut: ) -> ResourceVersionOut:
""" """
Delete the resource version in the S3 bucket. Delete the resource version in the S3 bucket.
...@@ -297,15 +307,15 @@ async def delete_resource_version_s3( ...@@ -297,15 +307,15 @@ async def delete_resource_version_s3(
Resource associated with the ID in the path. Dependency Injection. Resource associated with the ID in the path. Dependency Injection.
resource_version : clowmdb.models.ResourceVersion resource_version : clowmdb.models.ResourceVersion
Resource Version associated with the ID in the path. Dependency Injection. Resource Version associated with the ID in the path. Dependency Injection.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
""" """
trace.get_current_span().set_attributes( trace.get_current_span().set_attributes(
{"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)} {"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)}
) )
await authorization("delete_s3") await CRUDResourceVersion.update_status(
return ResourceVersionOut( db, resource_version_id=resource_version.resource_version_id, status=ResourceVersion.Status.S3_DELETED
release="1.0.0",
status=ResourceVersion.Status.S3_DELETED,
resource_version_id=resource_version.resource_version_id,
resource_id=resource.resource_id,
created_at=0,
) )
await authorization("delete_s3")
resource_version.status = ResourceVersion.Status.S3_DELETED
return ResourceVersionOut.from_db_resource_version(resource_version)
from typing import Annotated, Any, Awaitable, Callable, List, Optional from typing import Annotated, Any, Awaitable, Callable, List, Optional
from uuid import uuid4
from clowmdb.models import ResourceVersion from clowmdb.models import ResourceVersion
from fastapi import APIRouter, Depends, Query, status from fastapi import APIRouter, Depends, Query, status
from opentelemetry import trace from opentelemetry import trace
from app.api.dependencies import AuthorizationDependency, CurrentResource, CurrentUser from app.api.dependencies import AuthorizationDependency, CurrentResource, CurrentUser, DBSession
from app.crud import CRUDResource
from app.schemas.resource import ResourceIn, ResourceOut from app.schemas.resource import ResourceIn, ResourceOut
from app.schemas.resource_version import ResourceVersionOut
from app.utils.otlp import start_as_current_span_async from app.utils.otlp import start_as_current_span_async
router = APIRouter(prefix="/resources", tags=["Resource"]) router = APIRouter(prefix="/resources", tags=["Resource"])
...@@ -21,6 +20,7 @@ tracer = trace.get_tracer_provider().get_tracer(__name__) ...@@ -21,6 +20,7 @@ tracer = trace.get_tracer_provider().get_tracer(__name__)
async def list_resources( async def list_resources(
authorization: Authorization, authorization: Authorization,
current_user: CurrentUser, current_user: CurrentUser,
db: DBSession,
maintainer_id: Annotated[ maintainer_id: Annotated[
Optional[str], Optional[str],
Query( Query(
...@@ -49,6 +49,8 @@ async def list_resources( ...@@ -49,6 +49,8 @@ async def list_resources(
Async function to ask the auth service for authorization. Dependency Injection. Async function to ask the auth service for authorization. Dependency Injection.
current_user : clowmdb.models.User current_user : clowmdb.models.User
Current user. Dependency injection. Current user. Dependency injection.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
maintainer_id : str | None, default None maintainer_id : str | None, default None
Filter resources by a maintainer. Query Parameter. Filter resources by a maintainer. Query Parameter.
version_status : List[clowmdb.models.ResourceVersion.Status] | None, default None version_status : List[clowmdb.models.ResourceVersion.Status] | None, default None
...@@ -70,8 +72,15 @@ async def list_resources( ...@@ -70,8 +72,15 @@ async def list_resources(
elif version_status is not None and maintainer_id is None: elif version_status is not None and maintainer_id is None:
rbac_operation = "list_filter" rbac_operation = "list_filter"
await authorization(rbac_operation) await authorization(rbac_operation)
resources = await CRUDResource.list_resources(
return [] db,
name_substring=name_substring,
maintainer_id=maintainer_id,
version_status=version_status
if version_status
else [ResourceVersion.Status.LATEST, ResourceVersion.Status.SYNCHRONIZED],
)
return [ResourceOut.from_db_resource(resource) for resource in resources]
@router.post("", summary="Request a new resource", status_code=status.HTTP_201_CREATED) @router.post("", summary="Request a new resource", status_code=status.HTTP_201_CREATED)
...@@ -80,6 +89,7 @@ async def request_resource( ...@@ -80,6 +89,7 @@ async def request_resource(
authorization: Authorization, authorization: Authorization,
current_user: CurrentUser, current_user: CurrentUser,
resource: ResourceIn, resource: ResourceIn,
db: DBSession,
) -> ResourceOut: ) -> ResourceOut:
""" """
Request a new resources. Request a new resources.
...@@ -94,27 +104,14 @@ async def request_resource( ...@@ -94,27 +104,14 @@ async def request_resource(
Current user. Dependency injection. Current user. Dependency injection.
resource : app.schemas.resource.ResourceIn resource : app.schemas.resource.ResourceIn
Data about the new resource. HTTP Body. Data about the new resource. HTTP Body.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
""" """
current_span = trace.get_current_span() current_span = trace.get_current_span()
current_span.set_attribute("resource_in", resource.model_dump_json(indent=2)) current_span.set_attribute("resource_in", resource.model_dump_json(indent=2))
await authorization("create") await authorization("create")
rid = uuid4() resource = await CRUDResource.create(db, resource_in=resource, maintainer_id=current_user.uid)
return ResourceOut( return ResourceOut.from_db_resource(db_resource=resource)
name=resource.name,
description=resource.description,
source=resource.source,
resource_id=rid,
maintainer_id=current_user.uid,
versions=[
ResourceVersionOut(
status=ResourceVersion.Status.RESOURCE_REQUESTED,
resource_version_id=uuid4(),
created_at=0,
resource_id=rid,
release="1.0.0",
)
],
)
@router.get("/{rid}", summary="Get a resource") @router.get("/{rid}", summary="Get a resource")
...@@ -153,13 +150,11 @@ async def get_resource( ...@@ -153,13 +150,11 @@ async def get_resource(
current_span.set_attribute("resource_id", str(resource.resource_id)) current_span.set_attribute("resource_id", str(resource.resource_id))
rbac_operation = "read_any" if resource.maintainer_id != current_user.uid and version_status is not None else "read" rbac_operation = "read_any" if resource.maintainer_id != current_user.uid and version_status is not None else "read"
await authorization(rbac_operation) await authorization(rbac_operation)
return ResourceOut( requested_versions = (
name="name", version_status if version_status else [ResourceVersion.Status.LATEST, ResourceVersion.Status.SYNCHRONIZED]
description="a" * 16, )
source="a" * 8, return ResourceOut.from_db_resource(
resource_id=resource.resource_id, resource, versions=[version for version in resource.versions if version.status in requested_versions]
maintainer_id=current_user.uid,
versions=[],
) )
...@@ -168,6 +163,7 @@ async def get_resource( ...@@ -168,6 +163,7 @@ async def get_resource(
async def delete_resource( async def delete_resource(
authorization: Authorization, authorization: Authorization,
resource: CurrentResource, resource: CurrentResource,
db: DBSession,
) -> None: ) -> None:
""" """
Delete a resources. Delete a resources.
...@@ -180,6 +176,9 @@ async def delete_resource( ...@@ -180,6 +176,9 @@ async def delete_resource(
Async function to ask the auth service for authorization. Dependency Injection. Async function to ask the auth service for authorization. Dependency Injection.
resource : clowmdb.models.Resource resource : clowmdb.models.Resource
Resource associated with the ID in the path. Dependency Injection. Resource associated with the ID in the path. Dependency Injection.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
""" """
trace.get_current_span().set_attribute("resource_id", str(resource.resource_id)) trace.get_current_span().set_attribute("resource_id", str(resource.resource_id))
await authorization("delete") await authorization("delete")
await CRUDResource.delete(db, resource_id=resource.resource_id)
from .crud_resource import CRUDResource # noqa: F401
from .crud_resource_version import CRUDResourceVersion # noqa: F401
from .crud_user import CRUDUser # noqa: F401 from .crud_user import CRUDUser # noqa: F401
from typing import List, Optional
from uuid import UUID
from clowmdb.models import Resource, ResourceVersion
from opentelemetry import trace
from sqlalchemy import delete, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from app.crud.crud_resource_version import CRUDResourceVersion
from app.schemas.resource import ResourceIn
tracer = trace.get_tracer_provider().get_tracer(__name__)
class CRUDResource:
@staticmethod
async def get(db: AsyncSession, resource_id: UUID) -> Optional[Resource]:
"""
Get a resource by its ID.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
resource_id : uuid.UUID
ID of a resource.
Returns
-------
resource : clowmdb.models.Resource | None
The resource with the given ID if it exists, None otherwise
"""
stmt = select(Resource).where(Resource._resource_id == resource_id.bytes).options(joinedload(Resource.versions))
with tracer.start_as_current_span(
"db_get_resource", attributes={"resource_id": str(resource_id), "sql_query": str(stmt)}
):
return await db.scalar(stmt)
@staticmethod
async def list_resources(
db: AsyncSession,
name_substring: Optional[str] = None,
maintainer_id: Optional[str] = None,
version_status: Optional[List[ResourceVersion.Status]] = None,
) -> List[Resource]:
"""
List all resources. Populates the version attribute of the resources.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
name_substring : str | None, default None
Substring to filter for in the name of a resource.
maintainer_id : str | None, default None
Filter resources by maintainer.
version_status : List[clowmdb.models.ResourceVersion.Status] | None, default None
Filter versions of a resource based on the status. Removes resources that have no version after this filter.
Returns
-------
workflows : List[clowmdb.models.Resource]
List of resources.
"""
with tracer.start_as_current_span("db_list_resources") as span:
stmt = select(Resource).options(joinedload(Resource.versions))
if name_substring is not None:
span.set_attribute("name_substring", name_substring)
stmt = stmt.where(Resource.name.contains(name_substring))
if maintainer_id is not None:
span.set_attribute("maintainer_id", maintainer_id)
stmt = stmt.where(Resource.maintainer_id == maintainer_id)
if version_status is not None and len(version_status) > 0:
span.set_attribute("status", [status.name for status in version_status])
stmt = stmt.options(
joinedload(
Resource.versions.and_(or_(*[ResourceVersion.status == status for status in version_status]))
)
)
span.set_attribute("sql_query", str(stmt))
return [w for w in (await db.scalars(stmt)).unique().all() if len(w.versions) > 0]
@staticmethod
async def delete(db: AsyncSession, resource_id: UUID) -> None:
"""
Delete a resource.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
resource_id : uuid.UUID
ID of a resource
"""
stmt = delete(Resource).where(Resource._resource_id == resource_id.bytes)
with tracer.start_as_current_span(
"db_delete_resource", attributes={"resource_id": str(resource_id), "sql_query": str(stmt)}
):
await db.execute(stmt)
await db.commit()
@staticmethod
async def create(db: AsyncSession, resource_in: ResourceIn, maintainer_id: str) -> Resource:
"""
Create a new resource.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession
Async database session to perform query on.
resource_in : app.schemas.resource.ResourceIn
Data about the new resource.
maintainer_id : str
UID of the maintainer.
Returns
-------
resource : clowmdb.models.Resource
The newly created resource
"""
with tracer.start_as_current_span(
"db_create_resource",
attributes={"maintainer_id": maintainer_id, "resource_in": resource_in.model_dump_json(indent=2)},
) as span:
resource_db = Resource(
name=resource_in.name,
short_description=resource_in.description,
source=resource_in.source,
maintainer_id=maintainer_id,
)
db.add(resource_db)
await db.commit()
span.set_attribute("resource_id", str(resource_db.resource_id))
await CRUDResourceVersion.create(db, resource_id=resource_db.resource_id, release=resource_in.release)
return await CRUDResource.get(db, resource_db.resource_id)
from typing import Optional
from uuid import UUID
from clowmdb.models import ResourceVersion
from opentelemetry import trace
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
tracer = trace.get_tracer_provider().get_tracer(__name__)
class CRUDResourceVersion:
@staticmethod
async def create(db: AsyncSession, resource_id: UUID, release: str) -> ResourceVersion:
"""
Create a new resource version.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
resource_id : uuid.UUID
ID of the resource.
release : str
Versions of the resource.
Returns
-------
resource_version : clowmdb.models.ResourceVersion
The newly created resource version
"""
with tracer.start_as_current_span(
"db_create_resource_version", attributes={"resource_id": str(resource_id), "release": release}
) as span:
resource_version_db = ResourceVersion(release=release, _resource_id=resource_id.bytes)
db.add(resource_version_db)
await db.commit()
span.set_attribute("resource_version_id", str(resource_version_db.resource_version_id))
return resource_version_db
@staticmethod
async def update_status(db: AsyncSession, resource_version_id: UUID, status: ResourceVersion.Status) -> None:
"""
Update the status of a resource version.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
resource_version_id : uuid.UUID
Git commit git_commit_hash of the version.
status : clowmdb.models.ResourceVersion.Status
New status of the resource version
"""
stmt = (
update(ResourceVersion)
.where(ResourceVersion._resource_version_id == resource_version_id.bytes)
.values(status=status.name)
)
with tracer.start_as_current_span(
"db_update_resource_version_status",
attributes={"status": status.name, "resource_version_id": str(resource_version_id), "sql_query": str(stmt)},
):
await db.execute(stmt)
await db.commit()
@staticmethod
async def get(
db: AsyncSession, resource_version_id: UUID, resource_id: Optional[UUID] = None
) -> Optional[ResourceVersion]:
"""
Get a resource version by its ID.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
resource_version_id : uuid.UUID
ID of a resource version.
resource_id : uuid.UUID | None
Returns
-------
resource_version : clowmdb.models.ResourceVersion | None
The resource version with the given ID if it exists, None otherwise
"""
stmt = select(ResourceVersion).where(ResourceVersion._resource_version_id == resource_version_id.bytes)
with tracer.start_as_current_span(
"db_get_resource_version",
attributes={
"resource_version_id": str(resource_version_id),
},
) as span:
if resource_id:
span.set_attribute("resource_id", str(resource_id))
stmt = stmt.where(ResourceVersion._resource_id == resource_id.bytes)
span.set_attribute("sql_query", str(stmt))
return await db.scalar(stmt)
from typing import TYPE_CHECKING, List from typing import TYPE_CHECKING, Optional
import aioboto3 import aioboto3
from botocore.exceptions import ClientError
from opentelemetry import trace from opentelemetry import trace
from app.core.config import settings from app.core.config import settings
...@@ -21,20 +22,33 @@ botosession = aioboto3.Session( ...@@ -21,20 +22,33 @@ botosession = aioboto3.Session(
) )
async def get_s3_bucket_policy(s3: S3ServiceResource, bucket_name: str) -> BucketPolicy: async def get_s3_bucket_policy(s3: S3ServiceResource, bucket_name: str) -> Optional[str]:
with tracer.start_as_current_span("s3_get_bucket_policy", attributes={"bucket_name": bucket_name}): with tracer.start_as_current_span("s3_get_bucket_policy", attributes={"bucket_name": bucket_name}) as span:
s3_policy = await (await s3.Bucket(bucket_name)).Policy() s3_policy = await s3.BucketPolicy(bucket_name=bucket_name)
await s3_policy.load() try:
return s3_policy policy = await s3_policy.policy
except ClientError:
return None
span.set_attribute("policy", policy)
return policy
async def put_s3_bucket_policy(s3: S3ServiceResource, bucket_name: str, policy: str) -> None: async def put_s3_bucket_policy(s3: S3ServiceResource, bucket_name: str, policy: str) -> None:
with tracer.start_as_current_span("s3_put_bucket_policy", attributes={"bucket_name": bucket_name}): with tracer.start_as_current_span(
"s3_put_bucket_policy", attributes={"bucket_name": bucket_name, "policy": policy}
):
bucket_policy = await s3.BucketPolicy(bucket_name=bucket_name) bucket_policy = await s3.BucketPolicy(bucket_name=bucket_name)
await bucket_policy.put(Policy=policy) await bucket_policy.put(Policy=policy)
async def get_s3_bucket_objects(s3: S3ServiceResource, bucket_name: str) -> List[ObjectSummary]: async def get_s3_bucket_object(s3: S3ServiceResource, bucket_name: str, key: str) -> Optional[ObjectSummary]:
with tracer.start_as_current_span("s3_get_object_meta_data", attributes={"bucket_name": bucket_name}): with tracer.start_as_current_span(
bucket = await s3.Bucket(name=bucket_name) "s3_get_object_meta_data", attributes={"bucket_name": bucket_name, "key": key}
return [obj async for obj in bucket.objects.all()] ) as span:
obj = await s3.ObjectSummary(bucket_name=bucket_name, key=key)
try:
await obj.load()
except ClientError:
return None
span.set_attributes({"size": await obj.size, "last_modified": await obj.last_modified, "etag": await obj.e_tag})
return obj
from typing import List from typing import List, Optional, Sequence
from uuid import UUID from uuid import UUID
from clowmdb.models import Resource, ResourceVersion
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from app.schemas.resource_version import ResourceVersionIn, ResourceVersionOut from app.schemas.resource_version import ResourceVersionIn, ResourceVersionOut
...@@ -32,3 +33,33 @@ class ResourceOut(BaseResource): ...@@ -32,3 +33,33 @@ class ResourceOut(BaseResource):
resource_id: UUID = Field(..., description="ID of the resource", examples=["4c072e39-2bd9-4fa3-b564-4d890e240ccd"]) resource_id: UUID = Field(..., description="ID of the resource", examples=["4c072e39-2bd9-4fa3-b564-4d890e240ccd"])
maintainer_id: str = Field(..., description="ID of the maintainer", examples=["28c5353b8bb34984a8bd4169ba94c606"]) maintainer_id: str = Field(..., description="ID of the maintainer", examples=["28c5353b8bb34984a8bd4169ba94c606"])
versions: List[ResourceVersionOut] = Field(..., description="Versions of the resource") versions: List[ResourceVersionOut] = Field(..., description="Versions of the resource")
@staticmethod
def from_db_resource(db_resource: Resource, versions: Optional[Sequence[ResourceVersion]] = None) -> "ResourceOut":
"""
Create a ResourceOut schema from the resource database model.
Parameters
----------
db_resource: clowmdb.models.Resource
Database model of a resource.
versions : List[clowmdb.models.ResourceVersion] | None, default None
List of versions to attach to the schema. If None, they will be loaded from the DB model.
Returns
-------
resource : ResourceOut
Schema from the database model
"""
if versions is not None:
temp_versions = versions
else:
temp_versions = db_resource.versions
return ResourceOut(
resource_id=db_resource.resource_id,
name=db_resource.name,
description=db_resource.short_description,
source=db_resource.source,
maintainer_id=db_resource.maintainer_id,
versions=[ResourceVersionOut.from_db_resource_version(v) for v in temp_versions],
)
...@@ -54,3 +54,13 @@ class ResourceVersionOut(BaseResourceVersion): ...@@ -54,3 +54,13 @@ class ResourceVersionOut(BaseResourceVersion):
@property @property
def s3_path(self) -> str: def s3_path(self) -> str:
return f"s3://{settings.RESOURCE_BUCKET}/CLDB-{self.resource_id.hex[:8]}/{self.resource_version_id.hex}/resource.tar.gz" return f"s3://{settings.RESOURCE_BUCKET}/CLDB-{self.resource_id.hex[:8]}/{self.resource_version_id.hex}/resource.tar.gz"
@staticmethod
def from_db_resource_version(db_resource_version: ResourceVersion) -> "ResourceVersionOut":
return ResourceVersionOut(
release=db_resource_version.release,
status=db_resource_version.status,
resource_version_id=db_resource_version.resource_version_id,
resource_id=db_resource_version.resource_id,
created_at=db_resource_version.created_at,
)
from uuid import uuid4
import pytest import pytest
from clowmdb.models import Resource from clowmdb.models import Resource, ResourceVersion
from fastapi import status from fastapi import status
from httpx import AsyncClient from httpx import AsyncClient
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.schemas.resource import ResourceIn from app.schemas.resource import ResourceIn, ResourceOut
from app.tests.utils.user import UserWithAuthHeader from app.tests.utils.user import UserWithAuthHeader
from app.tests.utils.utils import random_lower_string from app.tests.utils.utils import CleanupList, random_lower_string
class _TestResourceRoutes: class _TestResourceRoutes:
...@@ -14,7 +18,9 @@ class _TestResourceRoutes: ...@@ -14,7 +18,9 @@ class _TestResourceRoutes:
class TestResourceRouteCreate(_TestResourceRoutes): class TestResourceRouteCreate(_TestResourceRoutes):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_create_resource_route(self, client: AsyncClient, random_user: UserWithAuthHeader) -> None: async def test_create_resource_route(
self, client: AsyncClient, random_user: UserWithAuthHeader, db: AsyncSession, cleanup: CleanupList
) -> None:
""" """
Test for creating a new resource. Test for creating a new resource.
...@@ -24,6 +30,10 @@ class TestResourceRouteCreate(_TestResourceRoutes): ...@@ -24,6 +30,10 @@ class TestResourceRouteCreate(_TestResourceRoutes):
HTTP Client to perform the request on. HTTP Client to perform the request on.
random_user : app.tests.utils.user.UserWithAuthHeader random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing. Random user for testing.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
cleanup : list[sqlalchemy.sql.dml.Delete]
List were to append sql deletes that gets executed after the test
""" """
resource = ResourceIn( resource = ResourceIn(
release=random_lower_string(6), release=random_lower_string(6),
...@@ -33,6 +43,19 @@ class TestResourceRouteCreate(_TestResourceRoutes): ...@@ -33,6 +43,19 @@ class TestResourceRouteCreate(_TestResourceRoutes):
) )
response = await client.post(self.base_path, json=resource.model_dump(), headers=random_user.auth_headers) response = await client.post(self.base_path, json=resource.model_dump(), headers=random_user.auth_headers)
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
resource_out = ResourceOut.model_validate_json(response.content)
cleanup.append(delete(Resource).where(Resource._resource_id == resource_out.resource_id.bytes))
assert resource_out.name == resource.name
assert len(resource_out.versions) == 1
resource_version = resource_out.versions[0]
assert resource_version.resource_id == resource_out.resource_id
assert resource_version.status == ResourceVersion.Status.RESOURCE_REQUESTED
assert resource_version.release == resource.release
resource_db = await db.scalar(select(Resource).where(Resource._resource_id == resource_out.resource_id.bytes))
assert resource_db is not None
class TestResourceRouteDelete(_TestResourceRoutes): class TestResourceRouteDelete(_TestResourceRoutes):
...@@ -98,3 +121,20 @@ class TestResourceRouteGet(_TestResourceRoutes): ...@@ -98,3 +121,20 @@ class TestResourceRouteGet(_TestResourceRoutes):
f"{self.base_path}/{str(random_resource.resource_id)}", headers=random_user.auth_headers f"{self.base_path}/{str(random_resource.resource_id)}", headers=random_user.auth_headers
) )
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
@pytest.mark.asyncio
async def test_get_non_existing_resource_route(
self, client: AsyncClient, random_resource: Resource, random_user: UserWithAuthHeader
) -> None:
"""
Test for getting a non-existing resource.
Parameters
----------
client : httpx.AsyncClient
HTTP Client to perform the request on.
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing.
"""
response = await client.get(f"{self.base_path}/{str(uuid4())}", headers=random_user.auth_headers)
assert response.status_code == status.HTTP_404_NOT_FOUND
from uuid import uuid4
import pytest import pytest
from clowmdb.models import Resource, ResourceVersion from clowmdb.models import Resource, ResourceVersion
from fastapi import status from fastapi import status
...@@ -99,6 +101,26 @@ class TestResourceVersionRouteGet(_TestResourceVersionRoutes): ...@@ -99,6 +101,26 @@ class TestResourceVersionRouteGet(_TestResourceVersionRoutes):
) )
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
@pytest.mark.asyncio
async def test_get_non_existing_resource_route(
self, client: AsyncClient, random_resource: Resource, random_user: UserWithAuthHeader
) -> None:
"""
Test for getting a non-existing resource version.
Parameters
----------
client : httpx.AsyncClient
HTTP Client to perform the request on.
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing.
"""
response = await client.get(
f"{self.base_path}/{str(random_resource.resource_id)}/versions/{str(uuid4())}",
headers=random_user.auth_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
class TestResourceVersionRouteDelete(_TestResourceVersionRoutes): class TestResourceVersionRouteDelete(_TestResourceVersionRoutes):
@pytest.mark.asyncio @pytest.mark.asyncio
......
...@@ -2,7 +2,6 @@ import asyncio ...@@ -2,7 +2,6 @@ import asyncio
from functools import partial from functools import partial
from secrets import token_urlsafe from secrets import token_urlsafe
from typing import AsyncGenerator, AsyncIterator, Dict, Iterator from typing import AsyncGenerator, AsyncIterator, Dict, Iterator
from uuid import uuid4
import httpx import httpx
import pytest import pytest
...@@ -10,6 +9,7 @@ import pytest_asyncio ...@@ -10,6 +9,7 @@ import pytest_asyncio
from clowmdb.db.session import get_async_session from clowmdb.db.session import get_async_session
from clowmdb.models import Resource, ResourceVersion from clowmdb.models import Resource, ResourceVersion
from pytrie import SortedStringTrie as Trie from pytrie import SortedStringTrie as Trie
from sqlalchemy import delete
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.api.dependencies import get_db, get_decode_jwt_function, get_httpx_client, get_s3_resource from app.api.dependencies import get_db, get_decode_jwt_function, get_httpx_client, get_s3_resource
...@@ -20,7 +20,7 @@ from app.tests.mocks.mock_opa_service import MockOpaService ...@@ -20,7 +20,7 @@ from app.tests.mocks.mock_opa_service import MockOpaService
from app.tests.mocks.mock_s3_resource import MockS3ServiceResource from app.tests.mocks.mock_s3_resource import MockS3ServiceResource
from app.tests.mocks.mock_slurm_cluster import MockSlurmCluster from app.tests.mocks.mock_slurm_cluster import MockSlurmCluster
from app.tests.utils.user import UserWithAuthHeader, create_random_user, decode_mock_token, get_authorization_headers from app.tests.utils.user import UserWithAuthHeader, create_random_user, decode_mock_token, get_authorization_headers
from app.tests.utils.utils import random_lower_string from app.tests.utils.utils import CleanupList, random_lower_string
jwt_secret = token_urlsafe(32) jwt_secret = token_urlsafe(32)
...@@ -172,23 +172,41 @@ async def random_resource(db: AsyncSession, random_user: UserWithAuthHeader) -> ...@@ -172,23 +172,41 @@ async def random_resource(db: AsyncSession, random_user: UserWithAuthHeader) ->
""" """
Create a random resource and deletes it afterward. Create a random resource and deletes it afterward.
""" """
yield Resource( resource_db = Resource(
_resource_id=uuid4().bytes,
name=random_lower_string(8), name=random_lower_string(8),
short_description=random_lower_string(32), short_description=random_lower_string(32),
source=random_lower_string(32), source=random_lower_string(32),
maintainer_id=random_user.user.uid, maintainer_id=random_user.user.uid,
) )
db.add(resource_db)
await db.commit()
resource_version_db = ResourceVersion(
release=random_lower_string(8), _resource_id=resource_db.resource_id.bytes, status=ResourceVersion.Status.LATEST
)
db.add(resource_version_db)
await db.commit()
await db.refresh(resource_db, attribute_names=["versions"])
yield resource_db
await db.execute(delete(Resource).where(Resource._resource_id == resource_db.resource_id.bytes))
await db.commit()
@pytest_asyncio.fixture(scope="function") @pytest_asyncio.fixture(scope="function")
async def random_resource_version(db: AsyncSession, random_resource: Resource) -> AsyncIterator[ResourceVersion]: async def random_resource_version(db: AsyncSession, random_resource: Resource) -> ResourceVersion:
""" """
Create a random resource version and deletes it afterward. Create a random resource version and deletes it afterward.
""" """
yield ResourceVersion( resource_version: ResourceVersion = random_resource.versions[0]
_resource_version_id=uuid4().bytes, return resource_version
_resource_id=random_resource.resource_id.bytes,
release=random_lower_string(8),
status=ResourceVersion.Status.RESOURCE_REQUESTED, @pytest_asyncio.fixture(scope="function")
) async def cleanup(db: AsyncSession) -> AsyncIterator[CleanupList]:
"""
Yield a list with sql delete statements that gets executed after a (failed) test
"""
to_delete: CleanupList = []
yield to_delete
for stmt in to_delete:
await db.execute(stmt)
await db.commit()
import random
from uuid import uuid4
import pytest
from clowmdb.models import Resource, ResourceVersion
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from app.crud import CRUDResource
from app.schemas.resource import ResourceIn
from app.tests.utils.user import UserWithAuthHeader
from app.tests.utils.utils import CleanupList, random_lower_string
class TestResourceCRUDGet:
@pytest.mark.asyncio
async def test_get_resource(self, db: AsyncSession, random_resource: Resource) -> None:
"""
Test for getting an existing resource from the database
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_resource : clowmdb.models.Resource
Random resource for testing.
"""
resource = await CRUDResource.get(db, resource_id=random_resource.resource_id)
assert resource is not None
assert resource == random_resource
@pytest.mark.asyncio
async def test_get_non_existing_resource(self, db: AsyncSession) -> None:
"""
Test for getting a non-existing resource from the database
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
"""
resource = await CRUDResource.get(db, resource_id=uuid4())
assert resource is None
class TestResourceCRUDList:
@pytest.mark.asyncio
async def test_get_all_resources(self, db: AsyncSession, random_resource: Resource) -> None:
"""
Test get all resources from the CRUD Repository.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_resource : app.schemas.resource.ResourceOut
Random resource for testing.
"""
resources = await CRUDResource.list_resources(db)
assert len(resources) == 1
assert resources[0].resource_id == random_resource.resource_id
@pytest.mark.asyncio
async def test_get_resources_by_maintainer(
self, db: AsyncSession, random_resource: Resource, random_user: UserWithAuthHeader
) -> None:
"""
Test get only resources from the CRUD Repository by specific maintainer.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_resource : app.schemas.resource.ResourceOut
Random resource for testing.
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing.
"""
resources = await CRUDResource.list_resources(db, maintainer_id=random_user.user.uid)
assert len(resources) == 1
assert resources[0].resource_id == random_resource.resource_id
@pytest.mark.asyncio
async def test_get_resources_with_unpublished_version(self, db: AsyncSession, random_resource: Resource) -> None:
"""
Test get only resources from the CRUD Repository with an unpublished version.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_resource : app.schemas.resource.ResourceOut
Random resource for testing.
"""
resources = await CRUDResource.list_resources(db, version_status=[ResourceVersion.Status.LATEST])
assert len(resources) == 1
assert resources[0].resource_id == random_resource.resource_id
@pytest.mark.asyncio
async def test_get_resource_with_name_substring(self, db: AsyncSession, random_resource: Resource) -> None:
"""
Test get resources with a substring in their name from the CRUD Repository.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_resource : app.schemas.resource.ResourceOut
Random resource for testing.
"""
substring_indices = [0, 0]
while substring_indices[0] == substring_indices[1]:
substring_indices = sorted(random.choices(range(len(random_resource.name)), k=2))
random_substring = random_resource.name[substring_indices[0] : substring_indices[1]]
resources = await CRUDResource.list_resources(db, name_substring=random_substring)
assert len(resources) > 0
assert random_resource.resource_id in map(lambda w: w.resource_id, resources)
@pytest.mark.asyncio
async def test_search_non_existing_resource_by_name(self, db: AsyncSession, random_resource: Resource) -> None:
"""
Test for getting a non-existing resource by its name from CRUD Repository.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_resource : app.schemas.resource.ResourceOut
Random resource for testing.
"""
resources = await CRUDResource.list_resources(db, name_substring=2 * random_resource.name)
assert sum(1 for w in resources if w.resource_id == random_resource.resource_id) == 0
class TestResourceCRUDDelete:
@pytest.mark.asyncio
async def test_delete_resource(self, db: AsyncSession, random_resource: Resource) -> None:
"""
Test for deleting a resource from CRUD Repository.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_resource : app.schemas.resource.ResourceOut
Random resource for testing.
"""
await CRUDResource.delete(db, resource_id=random_resource.resource_id)
deleted_resource = await db.scalar(
select(Resource).where(Resource._resource_id == random_resource.resource_id.bytes)
)
assert deleted_resource is None
deleted_resource_versions = (
await db.scalars(
select(ResourceVersion).where(ResourceVersion._resource_id == random_resource.resource_id.bytes)
)
).all()
assert len(deleted_resource_versions) == 0
class TestResourceCRUDCreate:
@pytest.mark.asyncio
async def test_create_resource(
self, db: AsyncSession, random_user: UserWithAuthHeader, cleanup: CleanupList
) -> None:
"""
Test for creating a new resource with the CRUD Repository.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing.
cleanup : list[sqlalchemy.sql.dml.Delete]
List were to append sql deletes that gets executed after the test
"""
resource_in = ResourceIn(
name=random_lower_string(8),
description=random_lower_string(),
source=random_lower_string(),
release=random_lower_string(8),
)
resource = await CRUDResource.create(db, resource_in=resource_in, maintainer_id=random_user.user.uid)
assert resource is not None
cleanup.append(delete(Resource).where(Resource._resource_id == resource.resource_id.bytes))
created_resource = await db.scalar(
select(Resource)
.where(Resource._resource_id == resource.resource_id.bytes)
.options(joinedload(Resource.versions))
)
assert created_resource is not None
assert created_resource == resource
assert len(created_resource.versions) == 1
assert created_resource.versions[0].release == resource_in.release
from uuid import uuid4
import pytest
from clowmdb.models import Resource, ResourceVersion
from sqlalchemy import delete, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from app.crud import CRUDResourceVersion
from app.tests.utils.utils import CleanupList, random_lower_string
class TestResourceVersionCRUDGet:
@pytest.mark.asyncio
async def test_get_resource_version(self, db: AsyncSession, random_resource_version: ResourceVersion) -> None:
"""
Test for getting an existing resource version from the database
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_resource_version : clowmdb.models.ResourceVersion
Random resource for testing.
"""
resource = await CRUDResourceVersion.get(db, resource_version_id=random_resource_version.resource_version_id)
assert resource is not None
assert resource == random_resource_version
@pytest.mark.asyncio
async def test_get_resource_version_with_resource_id(
self, db: AsyncSession, random_resource_version: ResourceVersion
) -> None:
"""
Test for getting an existing resource version from the database and filter is with the correct resource id
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_resource_version : clowmdb.models.ResourceVersion
Random resource for testing.
"""
resource = await CRUDResourceVersion.get(
db,
resource_version_id=random_resource_version.resource_version_id,
resource_id=random_resource_version.resource_id,
)
assert resource is not None
assert resource == random_resource_version
@pytest.mark.asyncio
async def test_get_resource_version_with_wrong_resource_id(
self, db: AsyncSession, random_resource_version: ResourceVersion
) -> None:
"""
Test for getting an existing resource version from the database and filter it with a wrong resource id
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_resource_version : clowmdb.models.ResourceVersion
Random resource for testing.
"""
resource = await CRUDResourceVersion.get(
db, resource_version_id=random_resource_version.resource_id, resource_id=uuid4()
)
assert resource is None
@pytest.mark.asyncio
async def test_get_non_existing_resource_version(self, db: AsyncSession) -> None:
"""
Test for getting a non-existing resource version from the database
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
"""
resource = await CRUDResourceVersion.get(db, resource_version_id=uuid4())
assert resource is None
class TestResourceCRUDUpdate:
@pytest.mark.asyncio
async def test_update_resource_version(self, db: AsyncSession, random_resource_version: ResourceVersion) -> None:
"""
Test for updating the resource version status from the CRUD Repository.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_resource_version : clowmdb.models.ResourceVersion
Random resource for testing.
"""
await CRUDResourceVersion.update_status(
db,
resource_version_id=random_resource_version.resource_version_id,
status=ResourceVersion.Status.S3_DELETED,
)
updated_resource_version = await db.scalar(
select(ResourceVersion).where(
ResourceVersion._resource_version_id == random_resource_version.resource_version_id.bytes
)
)
assert updated_resource_version is not None
assert updated_resource_version == random_resource_version
assert updated_resource_version.status == ResourceVersion.Status.S3_DELETED
@pytest.mark.asyncio
async def test_update_non_existing_resource_version(
self, db: AsyncSession, random_resource_version: ResourceVersion
) -> None:
"""
Test for updating a non-existing resource version status from the CRUD Repository.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_resource_version : clowmdb.models.ResourceVersion
Random resource for testing.
"""
await CRUDResourceVersion.update_status(
db,
resource_version_id=uuid4(),
status=ResourceVersion.Status.S3_DELETED,
)
resource_version = await db.scalar(
select(ResourceVersion).where(
ResourceVersion._resource_version_id == random_resource_version.resource_version_id.bytes
)
)
assert resource_version is not None
assert resource_version == random_resource_version
assert resource_version.status == random_resource_version.status
class TestResourceCRUDCreate:
@pytest.mark.asyncio
async def test_create_resource_version(
self, db: AsyncSession, random_resource: Resource, cleanup: CleanupList
) -> None:
"""
Test for creating a new resource version with the CRUD Repository.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
random_resource : clowmdb.models.Resource
Random resource for testing.
cleanup : list[sqlalchemy.sql.dml.Delete]
List were to append sql deletes that gets executed after the test
"""
release = random_lower_string(8)
resource_version = await CRUDResourceVersion.create(
db, resource_id=random_resource.resource_id, release=release
)
assert resource_version is not None
cleanup.append(
delete(ResourceVersion).where(
ResourceVersion._resource_version_id == resource_version.resource_version_id.bytes
)
)
created_resource_version = await db.scalar(
select(ResourceVersion).where(
ResourceVersion._resource_version_id == resource_version.resource_version_id.bytes
)
)
assert created_resource_version is not None
assert created_resource_version == resource_version
assert resource_version.status == ResourceVersion.Status.RESOURCE_REQUESTED
@pytest.mark.asyncio
async def test_create_resource_version_with_wrong_resource_id(self, db: AsyncSession) -> None:
"""
Test for creating a new resource version with a wrong resource id the CRUD Repository.
Parameters
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
"""
with pytest.raises(IntegrityError):
await CRUDResourceVersion.create(db, resource_id=uuid4(), release=random_lower_string(8))
...@@ -3,6 +3,8 @@ from typing import TYPE_CHECKING, Dict, List, Optional ...@@ -3,6 +3,8 @@ from typing import TYPE_CHECKING, Dict, List, Optional
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
from app.tests.utils.utils import random_hex_string
if TYPE_CHECKING: if TYPE_CHECKING:
from types_aiobotocore_s3.type_defs import CORSConfigurationTypeDef from types_aiobotocore_s3.type_defs import CORSConfigurationTypeDef
else: else:
...@@ -59,6 +61,8 @@ class MockS3ObjectSummary: ...@@ -59,6 +61,8 @@ class MockS3ObjectSummary:
Size of object in bytes. Always 100. Size of object in bytes. Always 100.
last_modified : datetime last_modified : datetime
Time and date of last modification of this object. Time and date of last modification of this object.
etag : str
Hash of the object content
""" """
def __init__(self, bucket_name: str, key: str) -> None: def __init__(self, bucket_name: str, key: str) -> None:
...@@ -76,6 +80,7 @@ class MockS3ObjectSummary: ...@@ -76,6 +80,7 @@ class MockS3ObjectSummary:
self.bucket_name = bucket_name self.bucket_name = bucket_name
self.size = 100 self.size = 100
self.last_modified = datetime.now() self.last_modified = datetime.now()
self.etag = random_hex_string(32)
def __repr__(self) -> str: def __repr__(self) -> str:
return f"MockS3ObjectSummary(key={self.key}, bucket={self.bucket_name})" return f"MockS3ObjectSummary(key={self.key}, bucket={self.bucket_name})"
......
import random import random
import string import string
from typing import List
from sqlalchemy.sql.dml import Delete
CleanupList = List[Delete]
def random_lower_string(length: int = 32) -> str: def random_lower_string(length: int = 32) -> str:
......
...@@ -24,7 +24,7 @@ concurrency = [ ...@@ -24,7 +24,7 @@ concurrency = [
omit = [ omit = [
"app/tests/**", "app/tests/**",
"app/check_database_connection.py", "app/check_database_connection.py",
"app/check_ceph_connection.py", "app/check_s3_connection.py",
"app/check_slurm_connection.py", "app/check_slurm_connection.py",
"app/core/config.py", "app/core/config.py",
] ]
......
...@@ -5,12 +5,12 @@ pytest-cov>=4.1.0,<4.2.0 ...@@ -5,12 +5,12 @@ pytest-cov>=4.1.0,<4.2.0
coverage[toml]>=7.3.0,<7.4.0 coverage[toml]>=7.3.0,<7.4.0
# Linters # Linters
ruff>=0.1.0,<0.2.0 ruff>=0.1.0,<0.2.0
black>=23.11.0,<23.12.0 black>=23.12.0,<24.0.0
isort>=5.12.0,<5.13.0 isort>=5.13.0,<5.14.0
mypy>=1.7.0,<1.8.0 mypy>=1.7.0,<1.8.0
# stubs for mypy # stubs for mypy
types-aiobotocore-lite[s3]>=2.8.0,<2.9.0 types-aiobotocore-lite[s3]>=2.8.0,<2.9.0
types-requests types-requests
# Miscellaneous # Miscellaneous
pre-commit>=3.5.0,<3.6.0 pre-commit>=3.6.0,<3.7.0
PyTrie>=0.4.0,<0.5.0 PyTrie>=0.4.0,<0.5.0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment