Skip to content
Snippets Groups Projects
Commit c93dfe39 authored by Daniel Göbel's avatar Daniel Göbel
Browse files

Merge branch 'feature/80-add-default-bucket-limits' into 'main'

Resolve "Add default bucket limits"

Closes #80

See merge request cmg/clowm/clowm-s3proxy-service!79
parents a0bbb88c e78ae2bb
No related branches found
No related tags found
No related merge requests found
...@@ -14,22 +14,17 @@ repos: ...@@ -14,22 +14,17 @@ repos:
- id: debug-statements - id: debug-statements
- id: check-merge-conflict - id: check-merge-conflict
- id: check-ast - id: check-ast
- repo: https://github.com/psf/black
rev: 24.4.0
hooks:
- id: black
files: app
args: [--check]
- repo: https://github.com/charliermarsh/ruff-pre-commit - repo: https://github.com/charliermarsh/ruff-pre-commit
rev: 'v0.3.7' rev: 'v0.4.1'
hooks: hooks:
- id: ruff - id: ruff
args: ["--fix", "--show-fixes"]
- id: ruff-format
- repo: https://github.com/PyCQA/isort - repo: https://github.com/PyCQA/isort
rev: 5.13.2 rev: 5.13.2
hooks: hooks:
- id: isort - id: isort
files: app files: app
args: [-c]
- repo: https://github.com/pre-commit/mirrors-mypy - repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.9.0 rev: v1.9.0
hooks: hooks:
......
...@@ -44,14 +44,16 @@ user-friendly manner. 👍 ...@@ -44,14 +44,16 @@ user-friendly manner. 👍
### S3 ### S3
| Env variable | Config file key | Default | Value | Example | Description | | Env variable | Config file key | Default | Value | Example | Description |
|--------------------------------|-----------------------|---------|----------|--------------------------|----------------------------------------------------------------------------------| |-----------------------------------------|----------------------------------|-----------|----------|----------------------------|----------------------------------------------------------------------------------|
| * `CLOWM_S3__URI` | `s3.uri` | unset | HTTP URL | `http://localhost` | URI of the S3 Object Storage | | * `CLOWM_S3__URI` | `s3.uri` | unset | HTTP URL | `http://localhost` | URI of the S3 Object Storage |
| * `CLOWM_S3__ACCESS_KEY` | `s3.acess_key` | unset | String | `ZR7U56KMK20VW` | Access key for the S3 that owns the buckets | | * `CLOWM_S3__ACCESS_KEY` | `s3.acess_key` | unset | String | `ZR7U56KMK20VW` | Access key for the S3 that owns the buckets |
| * `CLOWM_S3__SECRET_KEY` | `s3.secret_key` | unset | String | `9KRUU41EGSCB3H9ODECNHW` | Secret key for the S3 that owns the buckets | | * `CLOWM_S3__SECRET_KEY` | `s3.secret_key` | unset | String | `9KRUU41EGSCB3H9ODECNHW` | Secret key for the S3 that owns the buckets |
| * `CLOWM_S3__USERNAME` | `s3.username` | unset | String | `clowm-bucket-manager` | ID of the user in ceph who owns all the buckets. Owner of `CLOWM_S3__ACCESS_KEY` | | * `CLOWM_S3__USERNAME` | `s3.username` | unset | String | `clowm-bucket-manager` | ID of the user in ceph who owns all the buckets. Owner of `CLOWM_S3__ACCESS_KEY` |
| * `CLOWM_S3__ADMIN_ACCESS_KEY` | `s3.admin_acess_key` | unset | String | `ZR7U56KMK20VW` | Access key for the Ceph Object Gateway user with `user=*,bucket=*` capabilities. | | * `CLOWM_S3__ADMIN_ACCESS_KEY` | `s3.admin_acess_key` | unset | String | `ZR7U56KMK20VW` | Access key for the Ceph Object Gateway user with `user=*,bucket=*` capabilities. |
| * `CLOWM_S3__ADMIN_SECRET_KEY` | `s3.admin_secret_key` | unset | String | `9KRUU41EGSCB3H9ODECNHW` | Secret key for the Ceph Object Gateway user with `user=*,bucket=*` capabilities. | | * `CLOWM_S3__ADMIN_SECRET_KEY` | `s3.admin_secret_key` | unset | String | `9KRUU41EGSCB3H9ODECNHW` | Secret key for the Ceph Object Gateway user with `user=*,bucket=*` capabilities. |
| `CLOWM_S3__DEFAULT_BUCKET_SIZE_LIMIT` | `s3.default_bucket_size_limit` | `400 GiB` | ByteSize | `10 KB`, `10 KiB`, `10 MB` | Size limit of a new Bucket. Between `1 KiB` and `4.3 TB` |
| `CLOWM_S3__DEFAULT_BUCKET_OBJECT_LIMIT` | `s3.default_bucket_object_limit` | `40000` | Integer | `10000` | Maximum number of objects in a new bucket. Must be $<2^{32}$ |
### Security ### Security
......
from typing import Any
from fastapi import APIRouter, Depends, status
from app.schemas.security import ErrorDetail
from .dependencies import decode_bearer_token
from .endpoints import bucket_permission_router, bucket_router, miscellaneous_router, s3key_router
__all__ = ["api_router"]
alternative_responses: dict[int | str, dict[str, Any]] = {
status.HTTP_400_BAD_REQUEST: {
"model": ErrorDetail,
"description": "Error decoding JWT Token",
"content": {"application/json": {"example": {"detail": "Malformed JWT Token"}}},
},
status.HTTP_401_UNAUTHORIZED: {
"model": ErrorDetail,
"description": "Not authenticated",
"content": {"application/json": {"example": {"detail": "Not authenticated"}}},
},
status.HTTP_403_FORBIDDEN: {
"model": ErrorDetail,
"description": "Not authorized",
"content": {"application/json": {"example": {"detail": "Not authorized"}}},
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorDetail,
"description": "Entity not Found",
"content": {"application/json": {"example": {"detail": "Entity not found."}}},
},
}
api_router = APIRouter()
api_router.include_router(
bucket_router,
dependencies=[Depends(decode_bearer_token)],
responses=alternative_responses,
)
api_router.include_router(
s3key_router,
dependencies=[Depends(decode_bearer_token)],
responses=alternative_responses,
)
api_router.include_router(
bucket_permission_router,
dependencies=[Depends(decode_bearer_token)],
responses=alternative_responses,
)
api_router.include_router(miscellaneous_router)
from typing import Any
from fastapi import APIRouter, Depends, status
from app.api.dependencies import decode_bearer_token
from app.api.endpoints import bucket_permissions, buckets, s3key
from app.api.endpoints.miscellaneous_endpoints import router as miscellaneous_router
from app.schemas.security import ErrorDetail
alternative_responses: dict[int | str, dict[str, Any]] = {
status.HTTP_400_BAD_REQUEST: {
"model": ErrorDetail,
"description": "Error decoding JWT Token",
"content": {"application/json": {"example": {"detail": "Malformed JWT Token"}}},
},
status.HTTP_401_UNAUTHORIZED: {
"model": ErrorDetail,
"description": "Not authenticated",
"content": {"application/json": {"example": {"detail": "Not authenticated"}}},
},
status.HTTP_403_FORBIDDEN: {
"model": ErrorDetail,
"description": "Not authorized",
"content": {"application/json": {"example": {"detail": "Not authorized"}}},
},
status.HTTP_404_NOT_FOUND: {
"model": ErrorDetail,
"description": "Entity not Found",
"content": {"application/json": {"example": {"detail": "Entity not found."}}},
},
}
api_router = APIRouter()
api_router.include_router(
buckets.router,
dependencies=[Depends(decode_bearer_token)],
responses=alternative_responses,
)
api_router.include_router(
s3key.router,
dependencies=[Depends(decode_bearer_token)],
responses=alternative_responses,
)
api_router.include_router(
bucket_permissions.router,
dependencies=[Depends(decode_bearer_token)],
responses=alternative_responses,
)
api_router.include_router(miscellaneous_router)
from .bucket_permissions import router as bucket_permission_router
from .buckets import router as bucket_router
from .miscellaneous_endpoints import router as miscellaneous_router
from .s3key import router as s3key_router
__all__ = ["bucket_router", "bucket_permission_router", "s3key_router", "miscellaneous_router"]
...@@ -17,6 +17,7 @@ from app.api.dependencies import ( ...@@ -17,6 +17,7 @@ from app.api.dependencies import (
RGWAdminResource, RGWAdminResource,
S3Resource, S3Resource,
) )
from app.ceph.rgw import update_bucket_limits as rgw_update_bucket_limits
from app.ceph.s3 import get_s3_bucket_objects, get_s3_bucket_policy, put_s3_bucket_policy from app.ceph.s3 import get_s3_bucket_objects, get_s3_bucket_policy, put_s3_bucket_policy
from app.core.config import settings from app.core.config import settings
from app.crud import CRUDBucket, CRUDBucketPermission, DuplicateError from app.crud import CRUDBucket, CRUDBucketPermission, DuplicateError
...@@ -108,8 +109,6 @@ async def list_buckets( ...@@ -108,8 +109,6 @@ async def list_buckets(
Async database session to perform query on. Dependency Injection. Async database session to perform query on. Dependency Injection.
current_user : clowmdb.models.User current_user : clowmdb.models.User
Current user who will be the owner of the newly created bucket. Dependency Injection. Current user who will be the owner of the newly created bucket. Dependency Injection.
s3 : boto3_type_annotations.s3.ServiceResource
S3 Service to perform operations on buckets in Ceph. Dependency Injection.
authorization : Callable[[str], Awaitable[Any]] authorization : Callable[[str], Awaitable[Any]]
Async function to ask the auth service for authorization. Dependency Injection. Async function to ask the auth service for authorization. Dependency Injection.
Returns Returns
...@@ -143,6 +142,7 @@ async def create_bucket( ...@@ -143,6 +142,7 @@ async def create_bucket(
db: DBSession, db: DBSession,
s3: S3Resource, s3: S3Resource,
authorization: Authorization, authorization: Authorization,
rgw: RGWAdminResource,
) -> Bucket: ) -> Bucket:
""" """
Create a bucket for the current user.\n Create a bucket for the current user.\n
...@@ -163,6 +163,8 @@ async def create_bucket( ...@@ -163,6 +163,8 @@ async def create_bucket(
S3 Service to perform operations on buckets in Ceph. Dependency Injection. S3 Service to perform operations on buckets in Ceph. Dependency Injection.
authorization : Callable[[str], Awaitable[Any]] authorization : Callable[[str], Awaitable[Any]]
Async function to ask the auth service for authorization. Dependency Injection. Async function to ask the auth service for authorization. Dependency Injection.
rgw : rgwadmin.RGWAdmin
RGW admin interface to manage Ceph's object store. Dependency Injection.
Returns Returns
------- -------
...@@ -173,7 +175,13 @@ async def create_bucket( ...@@ -173,7 +175,13 @@ async def create_bucket(
current_span.set_attribute("bucket_name", bucket.name) current_span.set_attribute("bucket_name", bucket.name)
await authorization("create") await authorization("create")
try: try:
db_bucket = await CRUDBucket.create(bucket, current_user.uid, db=db) db_bucket = await CRUDBucket.create(
bucket,
current_user.uid,
db=db,
size_limit=int(settings.s3.default_bucket_size_limit.to("KiB")),
object_limit=settings.s3.default_bucket_object_limit,
)
except DuplicateError as e: except DuplicateError as e:
current_span.record_exception(e) current_span.record_exception(e)
raise HTTPException( raise HTTPException(
...@@ -200,7 +208,15 @@ async def create_bucket( ...@@ -200,7 +208,15 @@ async def create_bucket(
"Sid": "PseudoOwnerPerm", "Sid": "PseudoOwnerPerm",
"Effect": "Allow", "Effect": "Allow",
"Principal": {"AWS": [f"arn:aws:iam:::user/{current_user.uid}"]}, "Principal": {"AWS": [f"arn:aws:iam:::user/{current_user.uid}"]},
"Action": ["s3:GetObject", "s3:DeleteObject", "s3:PutObject", "s3:ListBucket"], "Action": [
"s3:GetObject",
"s3:DeleteObject",
"s3:PutObject",
"s3:ListBucket",
"s3:AbortMultipartUpload",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
],
"Resource": [f"arn:aws:s3:::{db_bucket.name}/*", f"arn:aws:s3:::{db_bucket.name}"], "Resource": [f"arn:aws:s3:::{db_bucket.name}/*", f"arn:aws:s3:::{db_bucket.name}"],
}, },
], ],
...@@ -210,6 +226,7 @@ async def create_bucket( ...@@ -210,6 +226,7 @@ async def create_bucket(
with tracer.start_as_current_span("s3_put_bucket_cors_rules") as span: with tracer.start_as_current_span("s3_put_bucket_cors_rules") as span:
span.set_attribute("bucket_name", db_bucket.name) span.set_attribute("bucket_name", db_bucket.name)
s3_bucket.Cors().put(CORSConfiguration=cors_rule) # type: ignore[arg-type] s3_bucket.Cors().put(CORSConfiguration=cors_rule) # type: ignore[arg-type]
rgw_update_bucket_limits(rgw=rgw, bucket=db_bucket)
return db_bucket return db_bucket
...@@ -342,27 +359,10 @@ async def update_bucket_limits( ...@@ -342,27 +359,10 @@ async def update_bucket_limits(
if limits.object_limit is not None: # pragma: no cover if limits.object_limit is not None: # pragma: no cover
current_span.set_attribute("object_limit", limits.object_limit) current_span.set_attribute("object_limit", limits.object_limit)
await authorization("update_any") await authorization("update_any")
with tracer.start_as_current_span(
"rgw_set_bucket_limits",
attributes={
"bucket_name": bucket.name,
"enabled": limits.object_limit is not None or limits.size_limit is not None,
},
) as span:
if limits.size_limit is not None: # pragma: no cover
span.set_attribute("size_limit", ByteSize(limits.size_limit * 1024).human_readable())
if limits.object_limit is not None: # pragma: no cover
span.set_attribute("object_limit", limits.object_limit)
rgw.set_bucket_quota(
uid=str(bucket.owner_id),
bucket=bucket.name,
max_size_kb=-1 if limits.size_limit is None else limits.size_limit,
max_objects=-1 if limits.object_limit is None else limits.object_limit,
enabled=limits.object_limit is not None or limits.size_limit is not None,
)
await CRUDBucket.update_bucket_limits( await CRUDBucket.update_bucket_limits(
db=db, bucket_name=bucket.name, object_limit=limits.object_limit, size_limit=limits.size_limit db=db, bucket_name=bucket.name, object_limit=limits.object_limit, size_limit=limits.size_limit
) )
rgw_update_bucket_limits(rgw=rgw, bucket=bucket)
return bucket return bucket
......
from uuid import UUID from uuid import UUID
from clowmdb.models import Bucket
from opentelemetry import trace from opentelemetry import trace
from pydantic import ByteSize
from rgwadmin import RGWAdmin from rgwadmin import RGWAdmin
from app.core.config import settings from app.core.config import settings
...@@ -8,6 +10,8 @@ from app.schemas.s3key import S3Key ...@@ -8,6 +10,8 @@ from app.schemas.s3key import S3Key
tracer = trace.get_tracer_provider().get_tracer(__name__) tracer = trace.get_tracer_provider().get_tracer(__name__)
__all__ = ["rgw", "get_s3_keys", "update_bucket_limits"]
rgw = RGWAdmin( rgw = RGWAdmin(
access_key=settings.s3.admin_access_key, access_key=settings.s3.admin_access_key,
secret_key=settings.s3.admin_secret_key.get_secret_value(), secret_key=settings.s3.admin_secret_key.get_secret_value(),
...@@ -19,3 +23,24 @@ rgw = RGWAdmin( ...@@ -19,3 +23,24 @@ rgw = RGWAdmin(
def get_s3_keys(rgw: RGWAdmin, uid: UUID) -> list[S3Key]: def get_s3_keys(rgw: RGWAdmin, uid: UUID) -> list[S3Key]:
with tracer.start_as_current_span("s3_get_user_keys", attributes={"uid": str(uid)}): with tracer.start_as_current_span("s3_get_user_keys", attributes={"uid": str(uid)}):
return [S3Key(uid=uid, **key) for key in rgw.get_user(uid=str(uid), stats=False)["keys"]] return [S3Key(uid=uid, **key) for key in rgw.get_user(uid=str(uid), stats=False)["keys"]]
def update_bucket_limits(rgw: RGWAdmin, bucket: Bucket) -> None:
with tracer.start_as_current_span(
"rgw_set_bucket_limits",
attributes={
"bucket_name": bucket.name,
"enabled": bucket.object_limit is not None or bucket.size_limit is not None,
},
) as span:
if bucket.size_limit is not None: # pragma: no cover
span.set_attribute("size_limit", ByteSize(bucket.size_limit * 1024).human_readable())
if bucket.object_limit is not None: # pragma: no cover
span.set_attribute("object_limit", bucket.object_limit)
rgw.set_bucket_quota(
uid=settings.s3.username,
bucket=bucket.name,
max_size_kb=-1 if bucket.size_limit is None else bucket.size_limit,
max_objects=-1 if bucket.object_limit is None else bucket.object_limit,
enabled=bucket.object_limit is not None or bucket.size_limit is not None,
)
...@@ -2,7 +2,7 @@ import os ...@@ -2,7 +2,7 @@ import os
from functools import cached_property from functools import cached_property
from typing import Literal, Type from typing import Literal, Type
from pydantic import AnyHttpUrl, BaseModel, Field, FilePath, MySQLDsn, NameEmail, SecretStr from pydantic import AnyHttpUrl, BaseModel, ByteSize, Field, FilePath, MySQLDsn, NameEmail, SecretStr, field_validator
from pydantic_settings import ( from pydantic_settings import (
BaseSettings, BaseSettings,
JsonConfigSettingsSource, JsonConfigSettingsSource,
...@@ -57,6 +57,20 @@ class S3Settings(BaseModel): ...@@ -57,6 +57,20 @@ class S3Settings(BaseModel):
admin_secret_key: SecretStr = Field( admin_secret_key: SecretStr = Field(
..., description="Secret key for the Ceph Object Gateway user with `user=*,bucket=*` capabilities." ..., description="Secret key for the Ceph Object Gateway user with `user=*,bucket=*` capabilities."
) )
# 25 * 2**32 = 400 GiB
default_bucket_size_limit: ByteSize = Field(ByteSize(25 * 2**34), description="Size limit of a new Bucket")
default_bucket_object_limit: int = Field(
40000, gt=0, lt=2**32, description="Maximum number of objects in a new bucket"
)
@field_validator("default_bucket_size_limit")
@classmethod
def default_bucket_size_limit_validator(cls, size: ByteSize) -> ByteSize:
if size.to("KiB") >= 2**32:
raise ValueError("size can be maximal 4.3TB")
elif size.to("KiB") < 1:
raise ValueError("size must be at least 1 KiB")
return size
class OPASettings(BaseModel): class OPASettings(BaseModel):
......
...@@ -157,7 +157,14 @@ class CRUDBucket: ...@@ -157,7 +157,14 @@ class CRUDBucket:
return (await db.scalars(stmt)).all() return (await db.scalars(stmt)).all()
@staticmethod @staticmethod
async def create(bucket_in: BucketInSchema, uid: UUID, *, db: AsyncSession) -> Bucket: async def create(
bucket_in: BucketInSchema,
uid: UUID,
size_limit: int | None = None,
object_limit: int | None = None,
*,
db: AsyncSession,
) -> Bucket:
""" """
Create a bucket for a given user. Create a bucket for a given user.
...@@ -175,7 +182,9 @@ class CRUDBucket: ...@@ -175,7 +182,9 @@ class CRUDBucket:
bucket : clowmdb.models.Bucket bucket : clowmdb.models.Bucket
Returns the created bucket. Returns the created bucket.
""" """
bucket = Bucket(**bucket_in.model_dump(), owner_id_bytes=uid.bytes) bucket = Bucket(
**bucket_in.model_dump(), owner_id_bytes=uid.bytes, size_limit=size_limit, object_limit=object_limit
)
with tracer.start_as_current_span( with tracer.start_as_current_span(
"db_create_bucket", "db_create_bucket",
attributes={"uid": str(uid), "bucket_name": bucket.name}, attributes={"uid": str(uid), "bucket_name": bucket.name},
......
...@@ -18,7 +18,7 @@ from opentelemetry.sdk.trace import TracerProvider ...@@ -18,7 +18,7 @@ from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import Status, StatusCode from opentelemetry.trace import Status, StatusCode
from app.api.api import api_router from app.api import api_router
from app.core.config import settings from app.core.config import settings
description = """ description = """
......
...@@ -6,6 +6,7 @@ from pydantic import TypeAdapter ...@@ -6,6 +6,7 @@ from pydantic import TypeAdapter
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.api.endpoints.buckets import ANONYMOUS_ACCESS_SID from app.api.endpoints.buckets import ANONYMOUS_ACCESS_SID
from app.core.config import settings
from app.crud import CRUDBucket from app.crud import CRUDBucket
from app.schemas.bucket import BucketIn, BucketOut, BucketSizeLimits from app.schemas.bucket import BucketIn, BucketOut, BucketSizeLimits
from app.tests.mocks.mock_s3_resource import MockS3ServiceResource from app.tests.mocks.mock_s3_resource import MockS3ServiceResource
...@@ -211,9 +212,10 @@ class TestBucketRoutesCreate(_TestBucketRoutes): ...@@ -211,9 +212,10 @@ class TestBucketRoutesCreate(_TestBucketRoutes):
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
bucket = BucketOut.model_validate_json(response.content) bucket = BucketOut.model_validate_json(response.content)
assert bucket
assert bucket.name == bucket_info.name assert bucket.name == bucket_info.name
assert bucket.owner_id == random_user.user.uid assert bucket.owner_id == random_user.user.uid
assert bucket.size_limit == settings.s3.default_bucket_size_limit.to("KiB")
assert bucket.object_limit == settings.s3.default_bucket_object_limit
db_bucket = await CRUDBucket.get(bucket_info.name, db=db) db_bucket = await CRUDBucket.get(bucket_info.name, db=db)
assert db_bucket assert db_bucket
......
...@@ -3,9 +3,6 @@ profile = "black" ...@@ -3,9 +3,6 @@ profile = "black"
line_length = 120 line_length = 120
balanced_wrapping = true balanced_wrapping = true
[tool.black]
line-length = 120
[tool.ruff] [tool.ruff]
line-length = 120 line-length = 120
target-version = "py312" target-version = "py312"
......
...@@ -2,10 +2,9 @@ ...@@ -2,10 +2,9 @@
pytest>=8.0.0,<8.2.0 pytest>=8.0.0,<8.2.0
pytest-asyncio>=0.21.0,<0.22.0 pytest-asyncio>=0.21.0,<0.22.0
pytest-cov>=5.0.0,<5.1.0 pytest-cov>=5.0.0,<5.1.0
coverage[toml]>=7.4.0,<7.5.0 coverage[toml]>=7.4.0,<7.6.0
# Linters # Linters
ruff<0.4.0 ruff>=0.4.0,<0.5.0
black>=24.2.0,<24.5.0
isort>=5.13.0,<5.14.0 isort>=5.13.0,<5.14.0
mypy>=1.8.0,<1.10.0 mypy>=1.8.0,<1.10.0
# stubs for mypy # stubs for mypy
......
#!/bin/sh -e #!/bin/sh -e
set -x set -x
isort --force-single-line-imports app ruff format app
ruff check --fix --show-fixes app ruff check --fix --show-fixes app
black app
isort app isort app
...@@ -4,12 +4,11 @@ set -x ...@@ -4,12 +4,11 @@ set -x
ruff --version ruff --version
ruff check app ruff check app
ruff format --diff app
isort --version isort --version
isort -c app isort -c app
black --version
black app --check
mypy --version mypy --version
mypy app mypy app
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment