Skip to content
Snippets Groups Projects

Resolve "Check for unique name of resources before creating a it"

7 files
+ 86
6
Compare changes
  • Side-by-side
  • Inline
Files
7
from typing import Annotated, Any, Awaitable, Callable, List, Union
from clowmdb.models import ResourceVersion
from fastapi import APIRouter, BackgroundTasks, Depends, Query, status
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, status
from opentelemetry import trace
from pydantic.json_schema import SkipJsonSchema
@@ -94,7 +94,7 @@ async def list_resources(
@router.post("", summary="Request a new resource", status_code=status.HTTP_201_CREATED)
@start_as_current_span_async("api_request_resource", tracer=tracer)
async def request_resource(
async def create_resource(
authorization: Authorization,
current_user: CurrentUser,
resource_in: ResourceIn,
@@ -125,6 +125,10 @@ async def request_resource(
current_span = trace.get_current_span()
current_span.set_attribute("resource_in", resource_in.model_dump_json(indent=2))
await authorization("create")
if await CRUDResource.get_by_name(db=db, name=resource_in.name) is not None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=f"Resource with name '{resource_in.name}' already exists"
)
resource = await CRUDResource.create(db, resource_in=resource_in, maintainer_id=current_user.uid)
resource_out = ResourceOut.from_db_resource(db_resource=resource)
background_tasks.add_task(give_permission_to_s3_resource, resource=resource_out)
Loading