Skip to content
Snippets Groups Projects
Verified Commit 1bc14213 authored by Daniel Göbel's avatar Daniel Göbel
Browse files

Update clowmdb to 3

#8
parent f3c2bede
No related branches found
No related tags found
1 merge request!8Resolve "Update clowmdb to 3"
Pipeline #45046 passed
Showing
with 246 additions and 125 deletions
......@@ -46,7 +46,7 @@ integration-test-job: # Runs integration tests with the database
MYSQL_DATABASE: "$DB_DATABASE"
MYSQL_USER: "$DB_USER"
MYSQL_PASSWORD: "$DB_PASSWORD"
- name: $CI_REGISTRY/cmg/clowm/clowm-database:v2.3
- name: $CI_REGISTRY/cmg/clowm/clowm-database:v3.0
alias: upgrade-db
script:
- python app/check_database_connection.py
......@@ -74,7 +74,7 @@ e2e-test-job: # Runs e2e tests on the API endpoints
MYSQL_DATABASE: "$DB_DATABASE"
MYSQL_USER: "$DB_USER"
MYSQL_PASSWORD: "$DB_PASSWORD"
- name: $CI_REGISTRY/cmg/clowm/clowm-database:v2.3
- name: $CI_REGISTRY/cmg/clowm/clowm-database:v3.0
alias: upgrade-db
script:
- python app/check_database_connection.py
......@@ -132,7 +132,7 @@ lint-test-job: # Runs linters checks on code
publish-dev-docker-container-job:
stage: deploy
image:
name: gcr.io/kaniko-project/executor:v1.17.0-debug
name: gcr.io/kaniko-project/executor:v1.20.0-debug
entrypoint: [""]
dependencies: []
only:
......@@ -155,7 +155,7 @@ publish-dev-docker-container-job:
publish-docker-container-job:
stage: deploy
image:
name: gcr.io/kaniko-project/executor:v1.17.0-debug
name: gcr.io/kaniko-project/executor:v1.20.0-debug
entrypoint: [""]
dependencies: []
only:
......
......@@ -15,13 +15,13 @@ repos:
- id: check-merge-conflict
- id: check-ast
- repo: https://github.com/psf/black
rev: 23.12.1
rev: 24.1.1
hooks:
- id: black
files: app
args: [--check]
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: 'v0.1.11'
rev: 'v0.1.15'
hooks:
- id: ruff
- repo: https://github.com/PyCQA/isort
......@@ -39,5 +39,5 @@ repos:
additional_dependencies:
- types-aiobotocore-lite[s3]>=2.8.0,<2.9.0
- sqlalchemy>=2.0.0.<2.1.0
- pydantic<2.6.0
- pydantic<2.7.0
- types-requests
FROM python:3.11-slim
EXPOSE 8000
ENV PORT=8000
EXPOSE $PORT
# dumb-init forwards the kill signal to the python process
RUN apt-get update && apt-get -y install dumb-init curl
RUN apt-get update && apt-get -y install dumb-init && apt-get clean
ENTRYPOINT ["/usr/bin/dumb-init", "--"]
STOPSIGNAL SIGINT
RUN pip install --no-cache-dir httpx[cli] "uvicorn<0.28.0"
HEALTHCHECK --interval=30s --timeout=4s CMD curl -f http://localhost:8000/health || exit 1
HEALTHCHECK --interval=30s --timeout=2s CMD httpx http://localhost:$PORT/health || exit 1
RUN useradd -m worker
USER worker
......@@ -13,10 +17,13 @@ WORKDIR /home/worker/code
ENV PYTHONPATH=/home/worker/code
ENV PATH="/home/worker/.local/bin:${PATH}"
COPY ./start_service_uvicorn.sh /home/worker/code/start.sh
COPY ./scripts/prestart.sh /home/worker/code/prestart.sh
COPY --chown=worker:worker requirements.txt ./requirements.txt
RUN pip install --user --no-cache-dir --upgrade -r requirements.txt
COPY --chown=worker:worker . .
COPY --chown=worker:worker ./app /home/worker/code/app
CMD ["./start_service.sh"]
CMD ["./start.sh"]
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.11-slim
EXPOSE 8000
FROM python:3.11-slim
ENV PORT=8000
EXPOSE $PORT
WORKDIR /app/
ENV PYTHONPATH=/app
RUN pip install --no-cache-dir httpx[cli]
RUN pip install --no-cache-dir httpx[cli] "gunicorn<21.3.0" "uvicorn<0.28.0"
COPY ./gunicorn_conf.py ./gunicorn_conf.py
COPY ./start_service_gunicorn.sh ./start.sh
HEALTHCHECK --interval=30s --timeout=4s CMD httpx http://localhost:$PORT/health || exit 1
HEALTHCHECK --interval=30s --timeout=2s CMD httpx http://localhost:$PORT/health || exit 1
COPY ./scripts/prestart.sh /app/prestart.sh
COPY ./requirements.txt /app/requirements.txt
COPY ./scripts/prestart.sh ./prestart.sh
COPY ./requirements.txt ./requirements.txt
RUN pip install --no-cache-dir --upgrade -r requirements.txt
COPY ./app /app/app
COPY ./app ./app
CMD ["./start.sh"]
......@@ -17,24 +17,27 @@ This is the Resource service of the CloWM service.
| `DB_PASSWORD` | unset | \<db password> | Password of the database user |
| `DB_DATABASE` | unset | \<db name> | Name of the database |
| `OBJECT_GATEWAY_URI` | unset | HTTP URL | HTTP URL of the Ceph Object Gateway |
| `BUCKET_CEPH_ACCESS_KEY` | unset | \<access key> | Ceph access key with admin privileges |
| `BUCKET_CEPH_SECRET_KEY` | unset | \<secret key> | Ceph secret key with admin privileges |
| `BUCKET_CEPH_ACCESS_KEY` | unset | \<access key> | S3 access key with admin privileges |
| `BUCKET_CEPH_SECRET_KEY` | unset | \<secret key> | S3 secret key with admin privileges |
| `OPA_URI` | unset | HTTP URL | HTTP URL of the OPA service |
| `SLURM_ENDPOINT` | unset | HTTP URL | HTTP URL to communicate with the Slurm cluster |
| `SLURM_TOKEN` | unset | \<JWT> | JWT for communication with the Slurm REST API. Should belong to the user `SLURM_USER` |
### Optional Variables
| Variable | Default | Value | Description |
|-----------------------------------|-------------------------|--------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------|
| `API_PREFIX` | `/api/workflow-service` | URL path | Prefix before every URL path |
| `SQLALCHEMY_VERBOSE_LOGGER` | `False` | `<"True"&#x7c;"False">` | Enables verbose SQL output.<br>Should be `false` in production |
| `RESOURCE_BUCKET` | `clowm-resources` | Bucket name | Bucket where to save workflow icons. Should be publicly available. |
| `SLURM_USER` | `slurm` | string | User on the slurm cluster who should run the job. Should be the user of the `SLURM_TOKEN` |
| `SLURM_WORKING_DIRECTORY` | `/tmp` | Path on slurm cluster | Working directory for the slurm job with the nextflow command |
| `OPA_POLICY_PATH` | `/clowm/authz/allow` | URL path | Path to the OPA Policy for Authorization |
| `OTLP_GRPC_ENDPOINT` | unset | <hostname / IP> | OTLP compatible endpoint to send traces via gRPC, e.g. Jaeger |
| Variable | Default | Value | Description |
|-----------------------------|-------------------------|-------------------------|-------------------------------------------------------------------------------------------|
| `API_PREFIX` | `/api/workflow-service` | URL path | Prefix before every URL path |
| `SQLALCHEMY_VERBOSE_LOGGER` | `False` | `<"True"&#x7c;"False">` | Enables verbose SQL output.<br>Should be `false` in production |
| `RESOURCE_BUCKET` | `clowm-resources` | Bucket name | Bucket where to save the resources. |
| `SLURM_USER` | `slurm` | string | User on the slurm cluster who should run the job. Should be the user of the `SLURM_TOKEN` |
| `SLURM_WORKING_DIRECTORY` | `/tmp` | Path on slurm cluster | Working directory for the slurm job with the nextflow command |
| `RESOURCE_CLUSTER_PATH` | `/vol/data/databases` | Path on slurm cluster | Directory on the cluster where the resources should be downloaded to |
| `RESOURCE_CONTAINER_PATH` | `/vol/resources` | Path on slurm cluster | Directory in the container where al resources are readonly available |
| `OPA_POLICY_PATH` | `/clowm/authz/allow` | URL path | Path to the OPA Policy for Authorization |
| `OTLP_GRPC_ENDPOINT` | unset | <hostname / IP> | OTLP compatible endpoint to send traces via gRPC, e.g. Jaeger |
## License
The API is licensed under the [Apache 2.0](https://www.apache.org/licenses/LICENSE-2.0) license. See the [License](LICENSE) file for more information
The API is licensed under the [Apache 2.0](https://www.apache.org/licenses/LICENSE-2.0) license. See
the [License](LICENSE) file for more information.
......@@ -159,6 +159,37 @@ async def decode_bearer_token(
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Malformed JWT")
async def get_current_user(token: Annotated[JWT, Depends(decode_bearer_token)], db: DBSession) -> User:
"""
Get the current user from the database based on the JWT.
FastAPI Dependency Injection Function.
Parameters
----------
token : app.schemas.security.JWT
The verified and decoded JWT.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
Returns
-------
user : clowmdb.models.User
User associated with the JWT sent with the HTTP request.
"""
try:
uid = UUID(token.sub)
except ValueError: # pragma: no cover
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Malformed JWT")
user = await CRUDUser.get(db, uid)
if user:
return user
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
CurrentUser = Annotated[User, Depends(get_current_user)]
class AuthorizationDependency:
"""
Class to parameterize the authorization request with the resource to perform an operation on.
......@@ -175,7 +206,7 @@ class AuthorizationDependency:
def __call__(
self,
token: Annotated[JWT, Depends(decode_bearer_token)],
user: CurrentUser,
client: HTTPClient,
) -> Callable[[str], Awaitable[AuthzResponse]]:
"""
......@@ -183,8 +214,8 @@ class AuthorizationDependency:
Parameters
----------
token : app.schemas.security.JWT
The verified and decoded JWT. Dependency Injection.
user : clowmdb.models.User
The current user based on the JWT. Dependency Injection.
client : httpx.AsyncClient
HTTP Client with an open connection. Dependency Injection.
......@@ -195,39 +226,12 @@ class AuthorizationDependency:
"""
async def authorization_wrapper(operation: str) -> AuthzResponse:
params = AuthzRequest(operation=operation, resource=self.resource, uid=token.sub)
params = AuthzRequest(operation=operation, resource=self.resource, uid=user.lifescience_id)
return await request_authorization(request_params=params, client=client)
return authorization_wrapper
async def get_current_user(token: Annotated[JWT, Depends(decode_bearer_token)], db: DBSession) -> User:
"""
Get the current user from the database based on the JWT.
FastAPI Dependency Injection Function.
Parameters
----------
token : app.schemas.security.JWT
The verified and decoded JWT.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
Returns
-------
user : clowmdb.models.User
User associated with the JWT sent with the HTTP request.
"""
user = await CRUDUser.get(db, token.sub)
if user:
return user
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
CurrentUser = Annotated[User, Depends(get_current_user)]
async def get_current_resource(rid: Annotated[UUID, Path()], db: DBSession) -> Resource:
"""
Get the current resource from the database based on the ID in the path.
......
......@@ -12,7 +12,6 @@ from app.api.dependencies import (
CurrentUser,
DBSession,
S3Resource,
SlurmClient,
)
from app.api.resource_cluster_utils import (
delete_cluster_resource_version,
......@@ -46,7 +45,7 @@ async def list_resource_versions(
version_status: Annotated[
Union[List[ResourceVersion.Status], SkipJsonSchema[None]],
Query(
description=f"Which versions to include in the response. Permission `resource:read_any` required, current user is the maintainer, then only permission `resource:read` required. Default `{ResourceVersion.Status.LATEST.name}` and `{ResourceVersion.Status.SYNCHRONIZED.name}`.",
description=f"Which versions to include in the response. Permission `resource:read_any` required, current user is the maintainer, then only permission `resource:read` required. Default `{ResourceVersion.Status.LATEST.name}`, `{ResourceVersion.Status.SYNCHRONIZED.name}` and `{ResourceVersion.Status.SETTING_LATEST.name}`.",
# noqa: E501
),
] = None,
......@@ -76,7 +75,9 @@ async def list_resource_versions(
)
await authorization(rbac_operation)
requested_versions = (
version_status if version_status else [ResourceVersion.Status.LATEST, ResourceVersion.Status.SYNCHRONIZED]
version_status
if version_status
else [ResourceVersion.Status.LATEST, ResourceVersion.Status.SYNCHRONIZED, ResourceVersion.Status.SETTING_LATEST]
)
return [
ResourceVersionOut.from_db_resource_version(version)
......@@ -114,8 +115,6 @@ async def request_resource_version(
Async database session to perform query on. Dependency Injection.
background_tasks : fastapi.BackgroundTasks
Entrypoint for new BackgroundTasks. Provided by FastAPI.
s3 : types_aiobotocore_s3.service_resource import S3ServiceResource
S3 Service to perform operations on buckets. Dependency Injection.
"""
current_span = trace.get_current_span()
current_span.set_attributes(
......@@ -256,7 +255,6 @@ async def resource_version_sync(
db: DBSession,
s3: S3Resource,
background_tasks: BackgroundTasks,
slurm_client: SlurmClient,
) -> ResourceVersionOut:
"""
Synchronize the resource version to the cluster.
......@@ -273,8 +271,6 @@ async def resource_version_sync(
Resource Version associated with the ID in the path. Dependency Injection.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
slurm_client : app.slurm.rest_client.SlurmClient
Slurm client with an open connection. Dependency Injection
background_tasks : fastapi.BackgroundTasks
Entrypoint for new BackgroundTasks. Provided by FastAPI.
s3 : types_aiobotocore_s3.service_resource import S3ServiceResource
......@@ -283,8 +279,9 @@ async def resource_version_sync(
trace.get_current_span().set_attributes(
{"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)}
)
await authorization("sync")
await authorization("sync_denied" if resource_version.status == ResourceVersion.Status.DENIED else "sync")
if resource_version.status not in [
ResourceVersion.Status.DENIED,
ResourceVersion.Status.SYNC_REQUESTED,
ResourceVersion.Status.CLUSTER_DELETED,
]:
......@@ -315,7 +312,6 @@ async def resource_version_latest(
resource: CurrentResource,
resource_version: CurrentResourceVersion,
background_tasks: BackgroundTasks,
slurm_client: SlurmClient,
) -> ResourceVersionOut:
"""
Set the resource version as the latest version.
......@@ -332,8 +328,6 @@ async def resource_version_latest(
Resource associated with the ID in the path. Dependency Injection.
resource_version : clowmdb.models.ResourceVersion
Resource Version associated with the ID in the path. Dependency Injection.
slurm_client : app.slurm.rest_client.SlurmClient
Slurm client with an open connection. Dependency Injection
background_tasks : fastapi.BackgroundTasks
Entrypoint for new BackgroundTasks. Provided by FastAPI.
"""
......@@ -346,7 +340,11 @@ async def resource_version_latest(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Can't set resource version to {ResourceVersion.Status.LATEST.name} with status {resource_version.status.name}",
)
await CRUDResourceVersion.update_status(
db, resource_version_id=resource_version.resource_version_id, status=ResourceVersion.Status.SETTING_LATEST
)
resource_version_out = ResourceVersionOut.from_db_resource_version(resource_version)
resource_version_out.status = ResourceVersion.Status.SETTING_LATEST
background_tasks.add_task(set_cluster_resource_version_latest, resource_version=resource_version_out)
return resource_version_out
......@@ -359,7 +357,6 @@ async def delete_resource_version_cluster(
resource_version: CurrentResourceVersion,
db: DBSession,
background_tasks: BackgroundTasks,
slurm_client: SlurmClient,
) -> ResourceVersionOut:
"""
Delete the resource version on the cluster.
......@@ -385,10 +382,19 @@ async def delete_resource_version_cluster(
{"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)}
)
await authorization("delete_cluster")
if resource_version.status not in [
ResourceVersion.Status.SYNCHRONIZED,
ResourceVersion.Status.LATEST,
ResourceVersion.Status.CLUSTER_DELETE_ERROR,
]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Can't delete resource version on cluster with status {resource_version.status.name}",
)
await CRUDResourceVersion.update_status(
db, resource_version_id=resource_version.resource_version_id, status=ResourceVersion.Status.CLUSTER_DELETED
db, resource_version_id=resource_version.resource_version_id, status=ResourceVersion.Status.CLUSTER_DELETING
)
resource_version.status = ResourceVersion.Status.CLUSTER_DELETED
resource_version.status = ResourceVersion.Status.CLUSTER_DELETING
resource_version_out = ResourceVersionOut.from_db_resource_version(resource_version)
background_tasks.add_task(delete_cluster_resource_version, resource_version=resource_version_out)
return resource_version_out
......@@ -401,7 +407,6 @@ async def delete_resource_version_s3(
resource: CurrentResource,
resource_version: CurrentResourceVersion,
db: DBSession,
s3: S3Resource,
background_tasks: BackgroundTasks,
) -> ResourceVersionOut:
"""
......@@ -427,11 +432,20 @@ async def delete_resource_version_s3(
trace.get_current_span().set_attributes(
{"resource_id": str(resource.resource_id), "resource_version_id": str(resource_version.resource_version_id)}
)
await authorization("delete_s3")
if resource_version.status not in [
ResourceVersion.Status.DENIED,
ResourceVersion.Status.CLUSTER_DELETED,
ResourceVersion.Status.S3_DELETE_ERROR,
]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Can't delete S3 resource version with status {resource_version.status.name}",
)
await CRUDResourceVersion.update_status(
db, resource_version_id=resource_version.resource_version_id, status=ResourceVersion.Status.S3_DELETED
db, resource_version_id=resource_version.resource_version_id, status=ResourceVersion.Status.S3_DELETING
)
await authorization("delete_s3")
resource_version.status = ResourceVersion.Status.S3_DELETED
resource_version.status = ResourceVersion.Status.S3_DELETING
background_tasks.add_task(
delete_s3_resource_version,
resource_id=resource.resource_id,
......
from typing import Annotated, Any, Awaitable, Callable, List, Union
from uuid import UUID
from clowmdb.models import ResourceVersion
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, status
......@@ -31,9 +32,8 @@ async def list_resources(
current_user: CurrentUser,
db: DBSession,
maintainer_id: Annotated[
Union[str, SkipJsonSchema[None]],
Union[UUID, SkipJsonSchema[None]],
Query(
max_length=64,
description="Filter for resource by maintainer. If current user is the same as maintainer ID, permission `resource:list` required, otherwise `resource:list_filter`.",
# noqa: E501
),
......@@ -41,7 +41,7 @@ async def list_resources(
version_status: Annotated[
Union[List[ResourceVersion.Status], SkipJsonSchema[None]],
Query(
description=f"Which versions of the resource to include in the response. Permission `resource:list_filter` required, unless `maintainer_id` is provided and current user is maintainer, then only permission `resource:list` required. Default `{ResourceVersion.Status.LATEST.name}` and `{ResourceVersion.Status.SYNCHRONIZED.name}`.",
description=f"Which versions of the resource to include in the response. Permission `resource:list_filter` required, unless `maintainer_id` is provided and current user is maintainer, then only permission `resource:list` required. Default `{ResourceVersion.Status.LATEST.name}`, `{ResourceVersion.Status.SYNCHRONIZED.name}` and `{ResourceVersion.Status.SETTING_LATEST.name}`.",
# noqa: E501
),
] = None,
......@@ -60,7 +60,7 @@ async def list_resources(
Current user. Dependency injection.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
maintainer_id : str | None, default None
maintainer_id : uuid.UUID | None, default None
Filter resources by a maintainer. Query Parameter.
version_status : List[clowmdb.models.ResourceVersion.Status] | None, default None
Filter resource version by their status. Query Parameter.
......@@ -69,7 +69,7 @@ async def list_resources(
"""
current_span = trace.get_current_span()
if maintainer_id: # pragma: no cover
current_span.set_attribute("maintainer_id", maintainer_id)
current_span.set_attribute("maintainer_id", str(maintainer_id))
if version_status: # pragma: no cover
current_span.set_attribute("version_status", [state.name for state in version_status])
if name_substring: # pragma: no cover
......@@ -85,9 +85,15 @@ async def list_resources(
db,
name_substring=name_substring,
maintainer_id=maintainer_id,
version_status=version_status
if version_status
else [ResourceVersion.Status.LATEST, ResourceVersion.Status.SYNCHRONIZED],
version_status=(
version_status
if version_status
else [
ResourceVersion.Status.LATEST,
ResourceVersion.Status.SYNCHRONIZED,
ResourceVersion.Status.SETTING_LATEST,
]
),
)
return [ResourceOut.from_db_resource(resource) for resource in resources]
......
from hashlib import md5
from typing import Mapping, Optional
from typing import Awaitable, Callable, Mapping, Optional
from fastapi import status
from fastapi import Request, Response, status
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
class HashJSONResponse(JSONResponse):
......@@ -11,3 +12,18 @@ class HashJSONResponse(JSONResponse):
# Add the ETag header (MD5 hash of content) to the response
if self.status_code == status.HTTP_200_OK:
self.headers["ETag"] = md5(self.body).hexdigest()
class ETagMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
response = await call_next(request)
if request.method == "GET":
# Client can ask if the cached data is stale or not
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-None-Match
# This saves network bandwidth but the database is still queried
if (
response.headers.get("ETag") is not None
and request.headers.get("If-None-Match") == response.headers["ETag"]
):
return Response(status_code=status.HTTP_304_NOT_MODIFIED)
return response
......@@ -51,7 +51,7 @@ async def synchronize_cluster_resource(resource_version: ResourceVersionOut) ->
failed_job = AsyncJob(
update_db_resource_wrapper,
resource_version_id=resource_version.resource_version_id,
status=ResourceVersion.Status.SYNC_REQUESTED,
status=ResourceVersion.Status.SYNC_ERROR,
resource_id=None,
)
try:
......@@ -185,6 +185,12 @@ async def set_cluster_resource_version_latest(resource_version: ResourceVersionO
resource_version_id=resource_version.resource_version_id,
resource_id=resource_version.resource_id,
),
failed_job=AsyncJob(
update_db_resource_wrapper,
status=ResourceVersion.Status.SYNCHRONIZED,
resource_version_id=resource_version.resource_version_id,
resource_id=resource_version.resource_id,
),
)
except (HTTPError, KeyError): # pragma: no cover
pass
......@@ -237,7 +243,22 @@ async def delete_cluster_resource_version(
slurm_client = get_slurm_client(client)
# Try to start the job on the slurm cluster
slurm_job_id = await slurm_client.submit_job(job_submission=job_submission)
await _monitor_proper_job_execution(slurm_client=slurm_client, slurm_job_id=slurm_job_id)
await _monitor_proper_job_execution(
slurm_client=slurm_client,
slurm_job_id=slurm_job_id,
success_job=AsyncJob(
update_db_resource_wrapper,
status=ResourceVersion.Status.CLUSTER_DELETED,
resource_version_id=resource_version.resource_version_id,
resource_id=resource_version.resource_id,
),
failed_job=AsyncJob(
update_db_resource_wrapper,
status=ResourceVersion.Status.CLUSTER_DELETE_ERROR,
resource_version_id=resource_version.resource_version_id,
resource_id=resource_version.resource_id,
),
)
except (HTTPError, KeyError): # pragma: no cover
pass
......
......@@ -2,9 +2,13 @@ from io import BytesIO
from typing import TYPE_CHECKING, Any, AsyncGenerator, Dict, Optional
from uuid import UUID
from botocore.exceptions import ClientError
from clowmdb.models import ResourceVersion
from opentelemetry import trace
from app.api.dependencies import get_db
from app.core.config import settings
from app.crud import CRUDResourceVersion
from app.s3.s3_resource import (
add_s3_bucket_policy_stmt,
boto_session,
......@@ -52,7 +56,7 @@ async def give_permission_to_s3_resource(resource: ResourceOut) -> None:
policy: Dict[str, Any] = {
"Sid": str(resource.resource_id),
"Effect": "Allow",
"Principal": {"AWS": f"arn:aws:iam:::user/{resource.maintainer_id}"},
"Principal": {"AWS": f"arn:aws:iam:::user/{str(resource.maintainer_id)}"},
"Resource": f"arn:aws:s3:::{settings.RESOURCE_BUCKET}",
"Action": ["s3:ListBucket"],
"Condition": {"StringLike": {"s3:prefix": resource_dir_name(resource.resource_id) + "/*"}},
......@@ -61,7 +65,7 @@ async def give_permission_to_s3_resource(resource: ResourceOut) -> None:
await add_s3_bucket_policy_stmt(policy, s3=s3, bucket_name=settings.RESOURCE_BUCKET)
async def give_permission_to_s3_resource_version(resource_version: ResourceVersionOut, maintainer_id: str) -> None:
async def give_permission_to_s3_resource_version(resource_version: ResourceVersionOut, maintainer_id: UUID) -> None:
"""
Give the maintainer permissions to upload the resource to the appropriate S3 key.
......@@ -75,7 +79,7 @@ async def give_permission_to_s3_resource_version(resource_version: ResourceVersi
policy: Dict[str, Any] = {
"Sid": str(resource_version.resource_version_id),
"Effect": "Allow",
"Principal": {"AWS": f"arn:aws:iam:::user/{maintainer_id}"},
"Principal": {"AWS": f"arn:aws:iam:::user/{str(maintainer_id)}"},
"Resource": f"arn:aws:s3:::{resource_version.s3_path[5:]}",
"Action": ["s3:DeleteObject", "s3:PutObject", "s3:GetObject"],
}
......@@ -144,10 +148,22 @@ async def delete_s3_resource_version(resource_id: UUID, resource_version_id: UUI
resource_version_id : uuid.UUID
ID of the resource version
"""
async for s3 in get_s3_resource():
await delete_s3_objects(
s3, bucket_name=settings.RESOURCE_BUCKET, prefix=resource_version_dir_name(resource_id, resource_version_id)
)
try:
async for s3 in get_s3_resource():
await delete_s3_objects(
s3,
bucket_name=settings.RESOURCE_BUCKET,
prefix=resource_version_dir_name(resource_id, resource_version_id),
)
async for db in get_db():
await CRUDResourceVersion.update_status(
db, status=ResourceVersion.Status.S3_DELETED, resource_version_id=resource_version_id
)
except ClientError:
async for db in get_db():
await CRUDResourceVersion.update_status(
db, status=ResourceVersion.Status.S3_DELETE_ERROR, resource_version_id=resource_version_id
)
async def get_s3_resource_version_obj(
......
......@@ -64,7 +64,7 @@ class CRUDResource:
async def list_resources(
db: AsyncSession,
name_substring: Optional[str] = None,
maintainer_id: Optional[str] = None,
maintainer_id: Optional[UUID] = None,
version_status: Optional[List[ResourceVersion.Status]] = None,
) -> List[Resource]:
"""
......@@ -76,7 +76,7 @@ class CRUDResource:
Async database session to perform query on.
name_substring : str | None, default None
Substring to filter for in the name of a resource.
maintainer_id : str | None, default None
maintainer_id : uuid.UUID | None, default None
Filter resources by maintainer.
version_status : List[clowmdb.models.ResourceVersion.Status] | None, default None
Filter versions of a resource based on the status. Removes resources that have no version after this filter.
......@@ -92,8 +92,8 @@ class CRUDResource:
span.set_attribute("name_substring", name_substring)
stmt = stmt.where(Resource.name.contains(name_substring))
if maintainer_id is not None:
span.set_attribute("maintainer_id", maintainer_id)
stmt = stmt.where(Resource.maintainer_id == maintainer_id)
span.set_attribute("maintainer_id", str(maintainer_id))
stmt = stmt.where(Resource._maintainer_id == maintainer_id.bytes)
if version_status is not None and len(version_status) > 0:
span.set_attribute("status", [status.name for status in version_status])
stmt = stmt.options(
......@@ -124,7 +124,7 @@ class CRUDResource:
await db.commit()
@staticmethod
async def create(db: AsyncSession, resource_in: ResourceIn, maintainer_id: str) -> Resource:
async def create(db: AsyncSession, resource_in: ResourceIn, maintainer_id: UUID) -> Resource:
"""
Create a new resource.
......@@ -144,13 +144,13 @@ class CRUDResource:
"""
with tracer.start_as_current_span(
"db_create_resource",
attributes={"maintainer_id": maintainer_id, "resource_in": resource_in.model_dump_json(indent=2)},
attributes={"maintainer_id": str(maintainer_id), "resource_in": resource_in.model_dump_json(indent=2)},
) as span:
resource_db = Resource(
name=resource_in.name,
short_description=resource_in.description,
source=resource_in.source,
maintainer_id=maintainer_id,
_maintainer_id=maintainer_id.bytes,
)
db.add(resource_db)
await db.commit()
......
from typing import Optional
from uuid import UUID
from clowmdb.models import User
from opentelemetry import trace
......@@ -10,7 +11,7 @@ tracer = trace.get_tracer_provider().get_tracer(__name__)
class CRUDUser:
@staticmethod
async def get(db: AsyncSession, uid: str) -> Optional[User]:
async def get(db: AsyncSession, uid: UUID) -> Optional[User]:
"""
Get a user by its UID.
......@@ -18,7 +19,7 @@ class CRUDUser:
----------
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on.
uid : str
uid : uuid.UUID
UID of a user.
Returns
......@@ -26,7 +27,7 @@ class CRUDUser:
user : clowmdb.models.User | None
The user for the given UID if he exists, None otherwise
"""
with tracer.start_as_current_span("db_get_user", attributes={"uid": uid}) as span:
stmt = select(User).where(User.uid == uid)
with tracer.start_as_current_span("db_get_user", attributes={"uid": str(uid)}) as span:
stmt = select(User).where(User._uid == uid.bytes)
span.set_attribute("sql_query", str(stmt))
return await db.scalar(stmt)
......@@ -18,7 +18,7 @@ from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import Status, StatusCode
from app.api.api import api_router
from app.api.middleware.etagmiddleware import HashJSONResponse
from app.api.middleware.etagmiddleware import ETagMiddleware, HashJSONResponse
from app.api.miscellaneous_endpoints import miscellaneous_router
from app.core.config import settings
from app.s3.s3_resource import boto_session
......@@ -55,7 +55,7 @@ app = FastAPI(
"email": "dgoebel@techfak.uni-bielefeld.de",
},
generate_unique_id_function=custom_generate_unique_id,
# license_info={"name": "MIT", "url": "https://mit-license.org/"},
license_info={"name": "Apache 2.0", "url": "https://www.apache.org/licenses/LICENSE-2.0"},
root_path=settings.API_PREFIX,
openapi_url=None, # create it manually to enable caching on client side
default_response_class=HashJSONResponse, # Add ETag header based on MD5 hash of content
......@@ -93,7 +93,7 @@ FastAPIInstrumentor.instrument_app(
# Enable caching based on ETag
# app.add_middleware(ETagMiddleware)
app.add_middleware(ETagMiddleware)
# Enable br compression for large responses, fallback gzip
app.add_middleware(BrotliMiddleware)
......
from typing import Annotated
from uuid import UUID as NativeUUID
from pydantic import PlainSerializer
UUID = Annotated[NativeUUID, PlainSerializer(lambda uuid: str(uuid), return_type=str, when_used="unless-none")]
from typing import List, Optional, Sequence
from uuid import UUID
from clowmdb.models import Resource, ResourceVersion, User
from pydantic import BaseModel, Field
from app.schemas import UUID
from app.schemas.resource_version import ResourceVersionIn, ResourceVersionOut, resource_version_dir_name
......@@ -31,7 +31,7 @@ class ResourceIn(BaseResource, ResourceVersionIn):
class ResourceOut(BaseResource):
resource_id: UUID = Field(..., description="ID of the resource", examples=["4c072e39-2bd9-4fa3-b564-4d890e240ccd"])
maintainer_id: str = Field(..., description="ID of the maintainer", examples=["28c5353b8bb34984a8bd4169ba94c606"])
maintainer_id: UUID = Field(..., description="ID of the maintainer", examples=["28c5353b8bb34984a8bd4169ba94c606"])
versions: List[ResourceVersionOut] = Field(..., description="Versions of the resource")
@staticmethod
......@@ -75,7 +75,7 @@ class S3ResourceVersionInfo(BaseResource):
resource_version_id: UUID = Field(
..., description="ID of the resource version", examples=["fb4cee12-1e91-49f3-905f-808845c7c1f4"]
)
maintainer_id: str = Field(..., description="ID of the maintainer", examples=["28c5353b8bb34984a8bd4169ba94c606"])
maintainer_id: UUID = Field(..., description="ID of the maintainer", examples=["28c5353b8bb34984a8bd4169ba94c606"])
maintainer: str = Field(..., description="Name of the maintainer", examples=["Bilbo Baggins"])
def s3_path(self) -> str:
......
from typing import Optional
from uuid import UUID
from clowmdb.models import ResourceVersion
from pydantic import BaseModel, Field, computed_field
from app.core.config import settings
from app.schemas import UUID
class BaseResourceVersion(BaseModel):
......
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, Field
from app.schemas import UUID
class AuthzResponse(BaseModel):
"""Schema for a response from OPA"""
......
......@@ -35,8 +35,8 @@ class TestOpenAPIRoute:
assert response1.status_code == status.HTTP_200_OK
assert response1.headers.get("ETag") is not None
# response2 = await client.get("/openapi.json", headers={"If-None-Match": response1.headers.get("ETag")})
# assert response2.status_code == status.HTTP_304_NOT_MODIFIED
response2 = await client.get("/openapi.json", headers={"If-None-Match": response1.headers.get("ETag")})
assert response2.status_code == status.HTTP_304_NOT_MODIFIED
@pytest.mark.asyncio
async def test_swagger_ui_route(self, client: AsyncClient) -> None:
......
import json
from copy import copy
from io import BytesIO
from uuid import uuid4
......@@ -183,6 +184,7 @@ class TestResourceVersionRouteDelete(_TestResourceVersionRoutes):
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing.
"""
previous_state = copy(random_resource_version_states.status)
response = await client.delete(
"/".join(
[
......@@ -195,7 +197,14 @@ class TestResourceVersionRouteDelete(_TestResourceVersionRoutes):
),
headers=random_user.auth_headers,
)
assert response.status_code == status.HTTP_200_OK
if previous_state in [
ResourceVersion.Status.SYNCHRONIZED,
ResourceVersion.Status.LATEST,
ResourceVersion.Status.CLUSTER_DELETE_ERROR,
]:
assert response.status_code == status.HTTP_200_OK
else:
assert response.status_code == status.HTTP_400_BAD_REQUEST
@pytest.mark.asyncio
async def test_delete_resource_version_s3_route(
......@@ -222,6 +231,7 @@ class TestResourceVersionRouteDelete(_TestResourceVersionRoutes):
mock_s3_service : app.tests.mocks.mock_s3_resource.MockS3ServiceResource
Mock S3 Service to manipulate objects.
"""
previous_state = copy(random_resource_version_states.status)
response = await client.delete(
"/".join(
[
......@@ -234,7 +244,15 @@ class TestResourceVersionRouteDelete(_TestResourceVersionRoutes):
),
headers=random_user.auth_headers,
)
assert response.status_code == status.HTTP_200_OK
if previous_state in [
ResourceVersion.Status.CLUSTER_DELETED,
ResourceVersion.Status.DENIED,
ResourceVersion.Status.S3_DELETE_ERROR,
]:
assert response.status_code == status.HTTP_200_OK
else:
assert response.status_code == status.HTTP_400_BAD_REQUEST
return
# test that the resource is deleted in S3
with pytest.raises(ClientError):
......@@ -439,7 +457,7 @@ class TestResourceVersionRoutePut(_TestResourceVersionRoutes):
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing.
"""
previous_status = random_resource_version_states.status
previous_status = copy(random_resource_version_states.status)
response = await client.put(
"/".join(
[
......@@ -455,6 +473,7 @@ class TestResourceVersionRoutePut(_TestResourceVersionRoutes):
if previous_status in [
ResourceVersion.Status.SYNC_REQUESTED,
ResourceVersion.Status.CLUSTER_DELETED,
ResourceVersion.Status.DENIED,
]:
assert response.status_code == status.HTTP_200_OK
else:
......@@ -484,7 +503,7 @@ class TestResourceVersionRoutePut(_TestResourceVersionRoutes):
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing.
"""
previous_status = random_resource_version_states.status
previous_status = copy(random_resource_version_states.status)
mock_slurm_cluster.fail_jobs = True
def reset_slurm_config() -> None:
......@@ -506,6 +525,7 @@ class TestResourceVersionRoutePut(_TestResourceVersionRoutes):
if previous_status in [
ResourceVersion.Status.SYNC_REQUESTED,
ResourceVersion.Status.CLUSTER_DELETED,
ResourceVersion.Status.DENIED,
]:
assert response.status_code == status.HTTP_200_OK
else:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment