Skip to content
Snippets Groups Projects
Verified Commit c29d793c authored by Daniel Göbel's avatar Daniel Göbel
Browse files

Enable developers to delete their workflow

#32
parent 814e17ce
No related branches found
No related tags found
2 merge requests!69Delete dev branch,!29Resolve "Delete workflow as developer"
Pipeline #28027 passed
from typing import TYPE_CHECKING, AsyncGenerator, Awaitable, Callable
from typing import TYPE_CHECKING, Annotated, AsyncGenerator, Awaitable, Callable
from uuid import UUID
from authlib.jose.errors import BadSignatureError, DecodeError, ExpiredTokenError
......@@ -18,14 +18,14 @@ from app.schemas.security import JWT, AuthzRequest, AuthzResponse
from app.slurm.slurm_rest_client import SlurmClient
if TYPE_CHECKING:
from boto3.resources.base import ServiceResource
from mypy_boto3_s3.service_resource import S3ServiceResource
else:
ServiceResource = object
S3ServiceResource = object
bearer_token = HTTPBearer(description="JWT Header")
def get_s3_resource() -> ServiceResource:
def get_s3_resource() -> S3ServiceResource:
return s3_resource # pragma: no cover
......@@ -232,3 +232,10 @@ async def get_current_workflow_execution(
if execution is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Workflow Execution with id {eid} not found")
return execution
CurrentUser = Annotated[User, Depends(get_current_user)]
CurrentWorkflow = Annotated[Workflow, Depends(get_current_workflow)]
DBSession = Annotated[AsyncSession, Depends(get_db)]
HTTPClient = Annotated[AsyncClient, Depends(get_httpx_client)]
S3Service = Annotated[S3ServiceResource, Depends(get_s3_resource)]
from typing import TYPE_CHECKING, Any, Awaitable, Callable
from typing import Annotated, Any, Awaitable, Callable
from clowmdb.models import User, Workflow, WorkflowVersion
from clowmdb.models import Workflow, WorkflowVersion
from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, Query, UploadFile, status
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.dependencies import (
AuthorizationDependency,
get_current_user,
get_current_workflow,
get_db,
get_httpx_client,
get_s3_resource,
)
from app.api.dependencies import AuthorizationDependency, CurrentUser, CurrentWorkflow, DBSession, HTTPClient, S3Service
from app.api.utils import check_repo, upload_icon
from app.core.config import settings
from app.crud import CRUDWorkflow, CRUDWorkflowVersion
......@@ -20,17 +11,17 @@ from app.git_repository import build_repository
from app.schemas.workflow import WorkflowIn, WorkflowOut
from app.schemas.workflow_version import WorkflowVersionFull, WorkflowVersionUpdate
if TYPE_CHECKING:
from mypy_boto3_s3.service_resource import S3ServiceResource
else:
S3ServiceResource = object
router = APIRouter(prefix="/workflows", tags=["Workflow"])
workflow_authorization = AuthorizationDependency(resource="workflow")
Authorization = Annotated[Callable[[str], Awaitable[Any]], Depends(workflow_authorization)]
@router.get("", status_code=status.HTTP_200_OK, summary="List workflows")
async def list_workflows(
db: DBSession,
authorization: Authorization,
current_user: CurrentUser,
name_substring: str
| None = Query(
None,
......@@ -49,9 +40,6 @@ async def list_workflows(
description="Filter for workflow by developer. If current user is the same as developer ID, permission 'workflow:list' required, otherwise 'workflow:list_filter'.", # noqa: E501
example="28c5353b8bb34984a8bd4169ba94c606",
),
db: AsyncSession = Depends(get_db),
authorization: Callable[[str], Awaitable[Any]] = Depends(workflow_authorization),
current_user: User = Depends(get_current_user),
) -> list[WorkflowOut]:
"""
List all workflows.\n
......@@ -98,16 +86,16 @@ async def list_workflows(
@router.post("", status_code=status.HTTP_201_CREATED, summary="Create a new workflow")
async def create_workflow(
background_tasks: BackgroundTasks,
db: DBSession,
current_user: CurrentUser,
authorization: Authorization,
client: HTTPClient,
s3: S3Service,
workflow: WorkflowIn = Depends(WorkflowIn.as_form), # type: ignore[attr-defined]
icon: UploadFile | None = File(None, description="Optional Icon for the Workflow."),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
authorization: Callable[[str], Awaitable[Any]] = Depends(workflow_authorization),
client: AsyncClient = Depends(get_httpx_client),
s3: S3ServiceResource = Depends(get_s3_resource),
) -> WorkflowOut:
"""
Create a new workflow.\n
Create a new workflow.\nR
Permission "workflow:create" required.
\f
Parameters
......@@ -174,15 +162,15 @@ async def create_workflow(
@router.get("/{wid}", status_code=status.HTTP_200_OK, summary="Get a workflow")
async def get_workflow(
workflow: Workflow = Depends(get_current_workflow),
workflow: CurrentWorkflow,
db: DBSession,
current_user: CurrentUser,
authorization: Authorization,
version_status: list[WorkflowVersion.Status]
| None = Query(
None,
description=f"Which versions of the workflow to include in the response. Permission 'workflow:read_any' required if you are not the developer of this workflow. Default {WorkflowVersion.Status.PUBLISHED.name} and {WorkflowVersion.Status.DEPRECATED.name}", # noqa: E501
),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
authorization: Callable[[str], Awaitable[Any]] = Depends(workflow_authorization),
) -> WorkflowOut:
"""
Get a specific workflow.\n
......@@ -220,10 +208,11 @@ async def get_workflow(
@router.delete("/{wid}", status_code=status.HTTP_204_NO_CONTENT, summary="Delete a workflow")
async def delete_workflow(
background_tasks: BackgroundTasks,
workflow: Workflow = Depends(get_current_workflow),
db: AsyncSession = Depends(get_db),
authorization: Callable[[str], Awaitable[Any]] = Depends(workflow_authorization),
s3: S3ServiceResource = Depends(get_s3_resource),
workflow: CurrentWorkflow,
db: DBSession,
authorization: Authorization,
s3: S3Service,
current_user: CurrentUser,
) -> None:
"""
Delete a workflow.\n
......@@ -241,8 +230,11 @@ async def delete_workflow(
Async function to ask the auth service for authorization. Dependency Injection.
s3 : boto3_type_annotations.s3.ServiceResource
S3 Service to perform operations on buckets in Ceph. Dependency Injection.
current_user : clowmdb.models.User
Current user. Dependency Injection.
"""
await authorization("delete")
rbac_operation = "delete" if workflow.developer_id == current_user.uid else "delete_any"
await authorization(rbac_operation)
versions = await CRUDWorkflowVersion.list(db, workflow.workflow_id)
deleted_icons: set[str] = set()
# delete files in buckets
......@@ -259,17 +251,17 @@ async def delete_workflow(
@router.post("/{wid}/update", status_code=status.HTTP_201_CREATED, summary="Update a workflow")
async def update_workflow(
background_tasks: BackgroundTasks,
workflow: Workflow = Depends(get_current_workflow),
workflow: CurrentWorkflow,
client: HTTPClient,
db: DBSession,
current_user: CurrentUser,
s3: S3Service,
authorization: Authorization,
version_update: WorkflowVersionUpdate = Depends(WorkflowVersionUpdate.as_form), # type: ignore[attr-defined]
icon: UploadFile
| None = File(
None, description="Optional Icon for the workflow version. If None, then the previous one will be reused."
),
client: AsyncClient = Depends(get_httpx_client),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
s3: S3ServiceResource = Depends(get_s3_resource),
authorization: Callable[[str], Awaitable[Any]] = Depends(workflow_authorization),
) -> WorkflowVersionFull:
"""
Create a new workflow version.\n
......
import json
from tempfile import SpooledTemporaryFile
from typing import TYPE_CHECKING, Any, Awaitable, Callable
from typing import Annotated, Any, Awaitable, Callable
import jsonschema
from clowmdb.models import User, WorkflowExecution, WorkflowVersion
from clowmdb.models import WorkflowExecution, WorkflowVersion
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, status
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.dependencies import (
AuthorizationDependency,
get_current_user,
CurrentUser,
DBSession,
HTTPClient,
S3Service,
get_current_workflow_execution,
get_db,
get_httpx_client,
get_s3_resource,
get_slurm_client,
)
from app.api.utils import (
......@@ -29,23 +27,21 @@ from app.git_repository import build_repository
from app.schemas.workflow_execution import DevWorkflowExecutionIn, WorkflowExecutionIn, WorkflowExecutionOut
from app.slurm.slurm_rest_client import SlurmClient
if TYPE_CHECKING:
from mypy_boto3_s3.service_resource import S3ServiceResource
else:
S3ServiceResource = object
router = APIRouter(prefix="/workflow_executions", tags=["WorkflowExecution"])
workflow_authorization = AuthorizationDependency(resource="workflow_execution")
Authorization = Annotated[Callable[[str], Awaitable[Any]], Depends(workflow_authorization)]
CurrentWorkflowExecution = Annotated[WorkflowExecution, Depends(get_current_workflow_execution)]
@router.post("", status_code=status.HTTP_201_CREATED, summary="Start a new workflow execution")
async def start_workflow(
background_tasks: BackgroundTasks,
workflow_execution_in: WorkflowExecutionIn,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
authorization: Callable[[str], Awaitable[Any]] = Depends(workflow_authorization),
s3: S3ServiceResource = Depends(get_s3_resource),
db: DBSession,
current_user: CurrentUser,
authorization: Authorization,
s3: S3Service,
slurm_client: SlurmClient = Depends(get_slurm_client),
) -> WorkflowExecutionOut:
"""
......@@ -143,12 +139,12 @@ async def start_workflow(
async def start_arbitrary_workflow(
background_tasks: BackgroundTasks,
workflow_execution_in: DevWorkflowExecutionIn,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
db: DBSession,
current_user: CurrentUser,
client: HTTPClient,
s3: S3Service,
authorization: Callable[[str], Awaitable[Any]] = Depends(AuthorizationDependency(resource="workflow")),
s3: S3ServiceResource = Depends(get_s3_resource),
slurm_client: SlurmClient = Depends(get_slurm_client),
client: AsyncClient = Depends(get_httpx_client),
) -> WorkflowExecutionOut:
"""
Start a new workflow execution from an arbitrary git repository.\n
......@@ -234,6 +230,9 @@ async def start_arbitrary_workflow(
@router.get("", status_code=status.HTTP_200_OK, summary="Get all workflow executions")
async def list_workflow_executions(
db: DBSession,
current_user: CurrentUser,
authorization: Authorization,
user_id: str
| None = Query(
None,
......@@ -249,9 +248,6 @@ async def list_workflow_executions(
example="ba8bcd9294c2c96aedefa1763a84a18077c50c0f",
regex=r"^[0-9a-f]{40}$",
),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
authorization: Callable[[str], Awaitable[Any]] = Depends(workflow_authorization),
) -> list[WorkflowExecutionOut]:
"""
Get all workflow executions.\n
......@@ -293,9 +289,9 @@ async def list_workflow_executions(
@router.get("/{eid}", status_code=status.HTTP_200_OK, summary="Get a workflow execution")
async def get_workflow_execution(
workflow_execution: WorkflowExecution = Depends(get_current_workflow_execution),
current_user: User = Depends(get_current_user),
authorization: Callable[[str], Awaitable[Any]] = Depends(workflow_authorization),
workflow_execution: CurrentWorkflowExecution,
current_user: CurrentUser,
authorization: Authorization,
) -> WorkflowExecutionOut:
"""
Get a specific workflow execution.\n
......@@ -327,11 +323,11 @@ async def get_workflow_execution(
@router.delete("/{eid}", status_code=status.HTTP_204_NO_CONTENT, summary="Delete a workflow execution")
async def delete_workflow_execution(
background_tasks: BackgroundTasks,
workflow_execution: WorkflowExecution = Depends(get_current_workflow_execution),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
authorization: Callable[[str], Awaitable[Any]] = Depends(workflow_authorization),
s3: S3ServiceResource = Depends(get_s3_resource),
db: DBSession,
current_user: CurrentUser,
authorization: Authorization,
s3: S3Service,
workflow_execution: CurrentWorkflowExecution,
) -> None:
"""
Delete a specific workflow execution.\n
......@@ -372,10 +368,10 @@ async def delete_workflow_execution(
@router.post("/{eid}/cancel", status_code=status.HTTP_204_NO_CONTENT, summary="Cancel a workflow execution")
async def cancel_workflow_execution(
background_tasks: BackgroundTasks,
workflow_execution: WorkflowExecution = Depends(get_current_workflow_execution),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
authorization: Callable[[str], Awaitable[Any]] = Depends(workflow_authorization),
db: DBSession,
current_user: CurrentUser,
authorization: Authorization,
workflow_execution: CurrentWorkflowExecution,
slurm_client: SlurmClient = Depends(get_slurm_client),
) -> None:
"""
......
from typing import Annotated, Any, Awaitable, Callable
from clowmdb.models import User, Workflow, WorkflowVersion
from clowmdb.models import WorkflowVersion
from fastapi import APIRouter, Depends, HTTPException, Path, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.dependencies import AuthorizationDependency, get_current_user, get_current_workflow, get_db
from app.api.dependencies import AuthorizationDependency, CurrentUser, CurrentWorkflow, DBSession
from app.crud import CRUDWorkflowVersion
from app.git_repository import build_repository
from app.schemas.workflow_version import WorkflowVersionFull, WorkflowVersionStatus
......@@ -12,9 +11,6 @@ from app.schemas.workflow_version import WorkflowVersionFull, WorkflowVersionSta
router = APIRouter(prefix="/{wid}/versions", tags=["WorkflowVersion"])
workflow_authorization = AuthorizationDependency(resource="workflow")
CurrentUser = Annotated[User, Depends(get_current_user)]
CurrentWorkflow = Annotated[Workflow, Depends(get_current_workflow)]
DBSession = Annotated[AsyncSession, Depends(get_db)]
Authorization = Annotated[Callable[[str], Awaitable[Any]], Depends(workflow_authorization)]
GitCommitHash = Annotated[
str,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment