import pytest from clowmdb.models import Bucket, BucketPermission from fastapi import status from httpx import AsyncClient from pydantic import TypeAdapter from sqlalchemy.ext.asyncio import AsyncSession from app.api.endpoints.buckets import ANONYMOUS_ACCESS_SID from app.core.config import settings from app.crud import CRUDBucket from app.schemas.bucket import BucketIn, BucketOut, BucketSizeLimits from app.tests.mocks.mock_s3_resource import MockS3ServiceResource from app.tests.utils.bucket import add_permission_for_bucket, delete_bucket, make_bucket_public from app.tests.utils.cleanup import CleanupList from app.tests.utils.user import UserWithAuthHeader from app.tests.utils.utils import random_lower_string class _TestBucketRoutes: base_path = "/buckets" class TestBucketRoutesGet(_TestBucketRoutes): @pytest.mark.asyncio async def test_get_all_buckets( self, client: AsyncClient, random_bucket: Bucket, random_second_user: UserWithAuthHeader ) -> None: """ Test for getting all buckets with "list_all" operation. Parameters ---------- client : httpx.AsyncClient HTTP Client to perform the request on. random_bucket : clowmdb.models.Bucket Random bucket for testing. random_second_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. """ response = await client.get(f"{self.base_path}", headers=random_second_user.auth_headers) assert response.status_code == status.HTTP_200_OK ta = TypeAdapter(list[BucketOut]) buckets = ta.validate_json(response.content) assert len(buckets) == 1 bucket = buckets[0] assert bucket.name == random_bucket.name assert bucket.owner_id == random_bucket.owner_id @pytest.mark.asyncio async def test_get_own_buckets( self, client: AsyncClient, random_bucket: Bucket, random_user: UserWithAuthHeader, ) -> None: """ Test for getting the buckets where the user is the owner. Parameters ---------- client : httpx.AsyncClient HTTP Client to perform the request on. random_bucket : clowmdb.models.Bucket Random bucket for testing. random_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. """ response = await client.get( f"{self.base_path}", params={"owner_id": str(random_bucket.owner_id)}, headers=random_user.auth_headers ) assert response.status_code == status.HTTP_200_OK ta = TypeAdapter(list[BucketOut]) buckets = ta.validate_json(response.content) assert len(buckets) == 1 bucket = buckets[0] assert bucket.name == random_bucket.name assert bucket.owner_id == random_bucket.owner_id @pytest.mark.asyncio async def test_get_bucket_by_name( self, client: AsyncClient, random_bucket: Bucket, random_user: UserWithAuthHeader, ) -> None: """ Test for getting a bucket by its name where the user is the owner of the bucket. Parameters ---------- client : httpx.AsyncClient HTTP Client to perform the request on. random_bucket : clowmdb.models.Bucket Random bucket for testing. random_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. """ response = await client.get(f"{self.base_path}/{random_bucket.name}", headers=random_user.auth_headers) assert response.status_code == status.HTTP_200_OK bucket = BucketOut.model_validate_json(response.content) assert bucket.name == random_bucket.name assert bucket.owner_id == random_bucket.owner.uid @pytest.mark.asyncio async def test_get_unknown_bucket(self, client: AsyncClient, random_user: UserWithAuthHeader) -> None: """ Test for getting an unknown bucket by its name. Parameters ---------- client : httpx.AsyncClient HTTP Client to perform the request on. random_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. """ response = await client.get(f"{self.base_path}/impossible_bucket_name", headers=random_user.auth_headers) assert response.status_code == status.HTTP_404_NOT_FOUND @pytest.mark.asyncio async def test_get_foreign_bucket( self, client: AsyncClient, random_bucket: Bucket, random_second_user: UserWithAuthHeader ) -> None: """ Test for getting a foreign bucket without permission. Parameters ---------- client : httpx.AsyncClient HTTP Client to perform the request on. random_bucket : clowmdb.models.Bucket Random bucket for testing. random_second_user : app.tests.utils.user.UserWithAuthHeader Random user who is not the owner of the bucket. """ response = await client.get(f"{self.base_path}/{random_bucket.name}", headers=random_second_user.auth_headers) assert response.status_code == status.HTTP_403_FORBIDDEN @pytest.mark.asyncio async def test_get_foreign_public_bucket( self, client: AsyncClient, random_bucket: Bucket, random_second_user: UserWithAuthHeader, db: AsyncSession, mock_s3_service: MockS3ServiceResource, ) -> None: """ Test for getting a foreign public bucket. Parameters ---------- client : httpx.AsyncClient HTTP Client to perform the request on. random_bucket : clowmdb.models.Bucket Random bucket for testing. random_second_user : app.tests.utils.user.UserWithAuthHeader Random user who is not the owner of the bucket. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. mock_s3_service : app.tests.mocks.mock_s3_resource.MockS3ServiceResource Mock S3 Service to manipulate objects. """ await make_bucket_public(db=db, s3=mock_s3_service, bucket_name=random_bucket.name) response = await client.get(f"{self.base_path}/{random_bucket.name}", headers=random_second_user.auth_headers) assert response.status_code == status.HTTP_200_OK bucket = BucketOut.model_validate_json(response.content) assert bucket.name == random_bucket.name class TestBucketRoutesCreate(_TestBucketRoutes): @pytest.mark.asyncio async def test_create_bucket( self, db: AsyncSession, client: AsyncClient, random_user: UserWithAuthHeader, cleanup: CleanupList, ) -> None: """ Test for creating a bucket. Parameters ---------- db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. client : httpx.AsyncClient HTTP Client to perform the request on. random_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. cleanup : app.tests.utils.utils.CleanupList Cleanup object where (async) functions can be registered which get executed after a (failed) test. """ bucket_info = BucketIn(name=random_lower_string(), description=random_lower_string(127)) cleanup.add_task( delete_bucket, db=db, bucket_name=bucket_info.name, ) response = await client.post( self.base_path, headers=random_user.auth_headers, content=bucket_info.model_dump_json() ) assert response.status_code == status.HTTP_201_CREATED bucket = BucketOut.model_validate_json(response.content) assert bucket.name == bucket_info.name assert bucket.owner_id == random_user.user.uid assert bucket.size_limit == settings.s3.default_bucket_size_limit.to("KiB") assert bucket.object_limit == settings.s3.default_bucket_object_limit db_bucket = await CRUDBucket.get(bucket_info.name, db=db) assert db_bucket assert db_bucket.name == bucket_info.name assert db_bucket.owner_id == random_user.user.uid @pytest.mark.asyncio async def test_create_duplicated_bucket( self, client: AsyncClient, random_bucket: Bucket, random_user: UserWithAuthHeader, ) -> None: """ Test for creating a bucket where the name is already taken. Parameters ---------- client : httpx.AsyncClient HTTP Client to perform the request on. random_bucket : clowmdb.models.Bucket Random bucket for testing. random_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. """ bucket_info = BucketIn(name=random_bucket.name, description=random_lower_string(127)) response = await client.post( self.base_path, headers=random_user.auth_headers, content=bucket_info.model_dump_json() ) assert response.status_code == status.HTTP_400_BAD_REQUEST class TestBucketRoutesUpdate(_TestBucketRoutes): @pytest.mark.asyncio async def test_make_bucket_public( self, client: AsyncClient, random_bucket: Bucket, random_user: UserWithAuthHeader, mock_s3_service: MockS3ServiceResource, ) -> None: """ Test for getting a foreign public bucket. Parameters ---------- client : httpx.AsyncClient HTTP Client to perform the request on. random_bucket : clowmdb.models.Bucket Random bucket for testing. random_user : app.tests.utils.user.UserWithAuthHeader Random user who is owner of the bucket. mock_s3_service : app.tests.mocks.mock_s3_resource.MockS3ServiceResource Mock S3 Service to manipulate objects. """ response = await client.patch( f"{self.base_path}/{random_bucket.name}/public", headers=random_user.auth_headers, json={"public": True} ) assert response.status_code == status.HTTP_200_OK bucket = BucketOut.model_validate_json(response.content) assert bucket.name == random_bucket.name assert bucket.public assert ANONYMOUS_ACCESS_SID in mock_s3_service.Bucket(bucket.name).Policy().policy @pytest.mark.asyncio async def test_update_bucket_limits( self, client: AsyncClient, random_bucket: Bucket, random_user: UserWithAuthHeader, ) -> None: """ Test for getting a foreign public bucket. Parameters ---------- client : httpx.AsyncClient HTTP Client to perform the request on. random_bucket : clowmdb.models.Bucket Random bucket for testing. random_user : app.tests.utils.user.UserWithAuthHeader Random user who is owner of the bucket. """ response = await client.patch( f"{self.base_path}/{random_bucket.name}/limits", headers=random_user.auth_headers, content=BucketSizeLimits(size_limit=10240, object_limit=1000).model_dump_json(), ) assert response.status_code == status.HTTP_200_OK bucket = BucketOut.model_validate_json(response.content) assert bucket.name == random_bucket.name assert bucket.size_limit is not None assert bucket.size_limit == 10240 assert bucket.object_limit is not None assert bucket.object_limit == 1000 @pytest.mark.asyncio async def test_make_bucket_private( self, client: AsyncClient, random_bucket: Bucket, random_user: UserWithAuthHeader, mock_s3_service: MockS3ServiceResource, db: AsyncSession, ) -> None: """ Test for getting a foreign public bucket. Parameters ---------- client : httpx.AsyncClient HTTP Client to perform the request on. random_bucket : clowmdb.models.Bucket Random bucket for testing. random_user : app.tests.utils.user.UserWithAuthHeader Random user who is owner of the bucket. mock_s3_service : app.tests.mocks.mock_s3_resource.MockS3ServiceResource Mock S3 Service to manipulate objects. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. """ await make_bucket_public(db=db, s3=mock_s3_service, bucket_name=random_bucket.name) response = await client.patch( f"{self.base_path}/{random_bucket.name}/public", headers=random_user.auth_headers, json={"public": False} ) assert response.status_code == status.HTTP_200_OK bucket = BucketOut.model_validate_json(response.content) assert bucket.name == random_bucket.name assert not bucket.public assert ANONYMOUS_ACCESS_SID not in mock_s3_service.Bucket(bucket.name).Policy().policy @pytest.mark.asyncio async def test_make_private_bucket_private( self, client: AsyncClient, random_bucket: Bucket, random_user: UserWithAuthHeader, mock_s3_service: MockS3ServiceResource, db: AsyncSession, ) -> None: """ Test for getting a foreign public bucket. Parameters ---------- client : httpx.AsyncClient HTTP Client to perform the request on. random_bucket : clowmdb.models.Bucket Random bucket for testing. random_user : app.tests.utils.user.UserWithAuthHeader Random user who is owner of the bucket. mock_s3_service : app.tests.mocks.mock_s3_resource.MockS3ServiceResource Mock S3 Service to manipulate objects. db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. """ response = await client.patch( f"{self.base_path}/{random_bucket.name}/public", headers=random_user.auth_headers, json={"public": False} ) assert response.status_code == status.HTTP_200_OK bucket = BucketOut.model_validate_json(response.content) assert bucket.name == random_bucket.name assert not bucket.public assert ANONYMOUS_ACCESS_SID not in mock_s3_service.Bucket(bucket.name).Policy().policy class TestBucketRoutesDelete(_TestBucketRoutes): @pytest.mark.asyncio async def test_delete_empty_bucket( self, client: AsyncClient, random_bucket: Bucket, random_user: UserWithAuthHeader, ) -> None: """ Test for deleting an empty bucket. Parameters ---------- client : httpx.AsyncClient HTTP Client to perform the request on. random_bucket : clowmdb.models.Bucket Random bucket for testing. random_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. """ response = await client.delete( f"{self.base_path}/{random_bucket.name}", headers=random_user.auth_headers, params={"force_delete": False} ) assert response.status_code == status.HTTP_204_NO_CONTENT @pytest.mark.asyncio async def test_delete_foreign_bucket_with_permission( self, client: AsyncClient, db: AsyncSession, random_second_user: UserWithAuthHeader, random_third_user: UserWithAuthHeader, cleanup: CleanupList, ) -> None: """ Test for deleting a foreign bucket. Parameters ---------- db : sqlalchemy.ext.asyncio.AsyncSession. Async database session to perform query on. client : httpx.AsyncClient HTTP Client to perform the request on. random_third_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. random_second_user : app.tests.utils.user.UserWithAuthHeader Random user who is not the owner of the bucket. cleanup : app.tests.utils.utils.CleanupList Cleanup object where (async) functions can be registered which get executed after a (failed) test. """ bucket = Bucket( name=random_lower_string(), description=random_lower_string(127), owner_id_bytes=random_second_user.user.uid_bytes, ) db.add(bucket) await db.commit() cleanup.add_task( delete_bucket, db=db, bucket_name=bucket.name, ) await add_permission_for_bucket( db, bucket.name, random_third_user.user.uid, permission=BucketPermission.Permission.READWRITE ) response = await client.delete( f"{self.base_path}/{bucket.name}", headers=random_third_user.auth_headers, params={"force_delete": False} ) assert response.status_code == status.HTTP_403_FORBIDDEN @pytest.mark.asyncio async def test_delete_non_empty_bucket( self, client: AsyncClient, random_bucket: Bucket, random_user: UserWithAuthHeader, mock_s3_service: MockS3ServiceResource, ) -> None: """ Test for deleting a non-empty bucket. Parameters ---------- client : httpx.AsyncClient HTTP Client to perform the request on. random_bucket : clowmdb.models.Bucket Random bucket for testing. random_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. mock_s3_service : app.tests.mocks.mock_s3_resource.MockS3ServiceResource Mock S3 Service to manipulate objects. """ mock_s3_service.create_object_in_bucket(bucket_name=random_bucket.name, key=random_lower_string()) response = await client.delete( f"{self.base_path}/{random_bucket.name}", headers=random_user.auth_headers, params={"force_delete": False} ) assert response.status_code == status.HTTP_400_BAD_REQUEST @pytest.mark.asyncio async def test_force_delete_non_empty_bucket( self, client: AsyncClient, random_bucket: Bucket, random_user: UserWithAuthHeader, mock_s3_service: MockS3ServiceResource, ) -> None: """ Test for force deleting a non-empty bucket. Parameters ---------- client : httpx.AsyncClient HTTP Client to perform the request on. random_bucket : clowmdb.models.Bucket Random bucket for testing. random_user : app.tests.utils.user.UserWithAuthHeader Random user for testing. mock_s3_service : app.tests.mocks.mock_s3_resource.MockS3ServiceResource Mock S3 Service to manipulate objects. """ mock_s3_service.create_object_in_bucket(bucket_name=random_bucket.name, key=random_lower_string()) response = await client.delete( f"{self.base_path}/{random_bucket.name}", headers=random_user.auth_headers, params={"force_delete": True} ) assert response.status_code == status.HTTP_204_NO_CONTENT