Skip to content
Snippets Groups Projects
Commit 34cee9d4 authored by Daniel Göbel's avatar Daniel Göbel
Browse files

Merge branch 'feature/42-implement-rbac-for-s3keys' into 'development'

Resolve "Implement RBAC for S3Keys"

Closes #42

See merge request cmg/clowm/clowm-s3proxy-service!37
parents d85e6da3 c2a10e9c
No related branches found
No related tags found
No related merge requests found
......@@ -177,7 +177,6 @@ async def get_user_by_path_uid(
default=..., description="UID of a user", example="28c5353b8bb34984a8bd4169ba94c606", max_length=64
),
db: AsyncSession = Depends(get_db),
token: JWT = Depends(decode_bearer_token),
) -> User:
"""
Get the user from the database with the given uid.
......@@ -191,8 +190,6 @@ async def get_user_by_path_uid(
The uid of a user. URL path parameter.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
token : app.schemas.security.JWT
Decoded JWT sent with the HTTP request.
Returns
-------
......@@ -202,13 +199,7 @@ async def get_user_by_path_uid(
"""
user = await CRUDUser.get(db, uid)
if user:
if user.uid == token.sub:
return user
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="A user can only access himself",
)
return user
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
......
from typing import Any, Awaitable, Callable
from clowmdb.models import User as UserDB
from fastapi import APIRouter, Depends, HTTPException, Path, status
from rgwadmin import RGWAdmin
from rgwadmin.exceptions import RGWAdminException
from app.api.dependencies import get_rgw_admin, get_user_by_path_uid
from app.api.dependencies import AuthorizationDependency, get_current_user, get_rgw_admin, get_user_by_path_uid
from app.schemas.user import S3Key
router = APIRouter(prefix="/users", tags=["S3Key"])
s3key_authorization = AuthorizationDependency(resource="s3_key")
@router.get(
......@@ -14,7 +17,12 @@ router = APIRouter(prefix="/users", tags=["S3Key"])
response_model=list[S3Key],
summary="Get the S3 Access keys from a user",
)
def get_user_keys(rgw: RGWAdmin = Depends(get_rgw_admin), user: UserDB = Depends(get_user_by_path_uid)) -> list[S3Key]:
async def get_user_keys(
rgw: RGWAdmin = Depends(get_rgw_admin),
current_user: UserDB = Depends(get_current_user),
user: UserDB = Depends(get_user_by_path_uid),
authorization: Callable[[str], Awaitable[Any]] = Depends(s3key_authorization),
) -> list[S3Key]:
"""
Get all the S3 Access keys for a specific user.
\f
......@@ -24,12 +32,19 @@ def get_user_keys(rgw: RGWAdmin = Depends(get_rgw_admin), user: UserDB = Depends
RGW admin interface to manage Ceph's object store. Dependency Injection.
user : clowmdb.models.User
User with given uid. Dependency Injection.
authorization : Callable[[str], Awaitable[Any]]
Async function to ask the auth service for authorization. Dependency Injection.
current_user : clowmdb.models.User
Current user who will be the owner of the newly created bucket. Dependency Injection.
Returns
-------
keys : list(app.schemas.user.S3Key)
All S3 keys from the user.
"""
if current_user.uid != user.uid:
raise HTTPException(status.HTTP_403_FORBIDDEN, detail="Action forbidden.")
await authorization("list")
return [S3Key(**key) for key in rgw.get_user(uid=user.uid, stats=False)["keys"]]
......@@ -39,7 +54,12 @@ def get_user_keys(rgw: RGWAdmin = Depends(get_rgw_admin), user: UserDB = Depends
summary="Create a Access key for a user",
status_code=status.HTTP_201_CREATED,
)
def create_user_key(rgw: RGWAdmin = Depends(get_rgw_admin), user: UserDB = Depends(get_user_by_path_uid)) -> S3Key:
async def create_user_key(
rgw: RGWAdmin = Depends(get_rgw_admin),
current_user: UserDB = Depends(get_current_user),
user: UserDB = Depends(get_user_by_path_uid),
authorization: Callable[[str], Awaitable[Any]] = Depends(s3key_authorization),
) -> S3Key:
"""
Create a S3 Access key for a specific user.
\f
......@@ -49,12 +69,19 @@ def create_user_key(rgw: RGWAdmin = Depends(get_rgw_admin), user: UserDB = Depen
RGW admin interface to manage Ceph's object store. Dependency Injection.
user : clowmdb.models.User
User with given uid. Dependency Injection.
authorization : Callable[[str], Awaitable[Any]]
Async function to ask the auth service for authorization. Dependency Injection.
current_user : clowmdb.models.User
Current user who will be the owner of the newly created bucket. Dependency Injection.
Returns
-------
key : app.schemas.user.S3Key
Newly created S3 key.
"""
if current_user.uid != user.uid:
raise HTTPException(status.HTTP_403_FORBIDDEN, detail="Action forbidden.")
await authorization("create")
before_keys_set = set(
map(
lambda key: key["access_key"],
......@@ -73,7 +100,7 @@ def create_user_key(rgw: RGWAdmin = Depends(get_rgw_admin), user: UserDB = Depen
response_model=S3Key,
summary="Get a specific S3 Access key from a user",
)
def get_user_key(
async def get_user_key(
access_id: str = Path(
...,
description="ID of the S3 access key",
......@@ -81,6 +108,8 @@ def get_user_key(
),
rgw: RGWAdmin = Depends(get_rgw_admin),
user: UserDB = Depends(get_user_by_path_uid),
current_user: UserDB = Depends(get_current_user),
authorization: Callable[[str], Awaitable[Any]] = Depends(s3key_authorization),
) -> S3Key:
"""
Get a specific S3 Access Key for a specific user.
......@@ -93,12 +122,19 @@ def get_user_key(
RGW admin interface to manage Ceph's object store. Dependency Injection.
user : clowmdb.models.User
User with given uid. Dependency Injection.
authorization : Callable[[str], Awaitable[Any]]
Async function to ask the auth service for authorization. Dependency Injection.
current_user : clowmdb.models.User
Current user who will be the owner of the newly created bucket. Dependency Injection.
Returns
-------
key : app.schemas.user.S3Key
Requested S3 key.
"""
if current_user.uid != user.uid:
raise HTTPException(status.HTTP_403_FORBIDDEN, detail="Action forbidden.")
await authorization("read")
keys = rgw.get_user(uid=user.uid, stats=False)["keys"]
try:
index = [key["access_key"] for key in keys].index(access_id)
......@@ -112,7 +148,7 @@ def get_user_key(
summary="Delete a specific S3 Access key from a user",
status_code=status.HTTP_204_NO_CONTENT,
)
def delete_user_key(
async def delete_user_key(
access_id: str = Path(
...,
description="ID of the S3 access key",
......@@ -120,6 +156,8 @@ def delete_user_key(
),
rgw: RGWAdmin = Depends(get_rgw_admin),
user: UserDB = Depends(get_user_by_path_uid),
current_user: UserDB = Depends(get_current_user),
authorization: Callable[[str], Awaitable[Any]] = Depends(s3key_authorization),
) -> None:
"""
Delete a specific S3 Access key for a specific user.
......@@ -132,7 +170,12 @@ def delete_user_key(
RGW admin interface to manage Ceph's object store. Dependency Injection.
user : clowmdb.models.User
User with given uid. Dependency Injection.
authorization : Callable[[str], Awaitable[Any]]
Async function to ask the auth service for authorization. Dependency Injection.
current_user : clowmdb.models.User
Current user who will be the owner of the newly created bucket. Dependency Injection.
"""
await authorization("delete" if current_user.uid == user.uid else "delete_any")
if len(rgw.get_user(uid=user.uid, stats=False)["keys"]) <= 1:
raise HTTPException(status.HTTP_400_BAD_REQUEST, detail="It's not possible to delete the last key")
try:
......
......@@ -7,7 +7,7 @@ from app.tests.utils.user import UserWithAuthHeader
class _TestS3KeyRoutes:
base_path = "/users/"
base_path = "/users"
class TestS3KeyRoutesGet(_TestS3KeyRoutes):
......@@ -31,7 +31,7 @@ class TestS3KeyRoutesGet(_TestS3KeyRoutes):
Random foreign user for testing. pytest fixture.
"""
response = await client.get(
f"{self.base_path}{random_second_user.user.uid}/keys", headers=random_user.auth_headers
f"{self.base_path}/{random_second_user.user.uid}/keys", headers=random_user.auth_headers
)
assert response.status_code == status.HTTP_403_FORBIDDEN
......@@ -52,7 +52,7 @@ class TestS3KeyRoutesGet(_TestS3KeyRoutes):
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing. pytest fixture.
"""
response = await client.get(f"{self.base_path}{random_user.user.uid}/keys", headers=random_user.auth_headers)
response = await client.get(f"{self.base_path}/{random_user.user.uid}/keys", headers=random_user.auth_headers)
keys = response.json()
assert response.status_code == status.HTTP_200_OK
assert isinstance(keys, list)
......@@ -80,7 +80,7 @@ class TestS3KeyRoutesGet(_TestS3KeyRoutes):
"""
s3_key = mock_rgw_admin.get_user(uid=random_user.user.uid)["keys"][0]
response = await client.get(
f"{self.base_path}{random_user.user.uid}/keys/{s3_key['access_key']}", headers=random_user.auth_headers
f"{self.base_path}/{random_user.user.uid}/keys/{s3_key['access_key']}", headers=random_user.auth_headers
)
response_key = response.json()
assert response.status_code == status.HTTP_200_OK
......@@ -88,6 +88,35 @@ class TestS3KeyRoutesGet(_TestS3KeyRoutes):
assert response_key["secret_key"] == s3_key["secret_key"]
assert response_key["user"] == s3_key["user"]
@pytest.mark.asyncio
async def test_get_specific_s3_key_from_foreign_user(
self,
client: AsyncClient,
random_user: UserWithAuthHeader,
random_second_user: UserWithAuthHeader,
mock_rgw_admin: MockRGWAdmin,
) -> None:
"""
Test for getting a specific S3 key from a user.
Parameters
----------
client : httpx.AsyncClient
HTTP Client to perform the request on. pytest fixture.
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing. pytest fixture.
random_second_user : app.tests.utils.user.UserWithAuthHeader
Random second user for testing. pytest fixture.
mock_rgw_admin : app.tests.mocks.mock_rgw_admin.MockRGWAdmin
Mock class for rgwadmin package. pytest fixture.
"""
s3_key = mock_rgw_admin.get_user(uid=random_user.user.uid)["keys"][0]
response = await client.get(
f"{self.base_path}/{random_user.user.uid}/keys/{s3_key['access_key']}",
headers=random_second_user.auth_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.asyncio
async def test_get_unknown_s3_key_for_user(self, client: AsyncClient, random_user: UserWithAuthHeader) -> None:
"""
......@@ -101,7 +130,7 @@ class TestS3KeyRoutesGet(_TestS3KeyRoutes):
Random user for testing. pytest fixture.
"""
response = await client.get(
f"{self.base_path}{random_user.user.uid}/keys/impossible_key", headers=random_user.auth_headers
f"{self.base_path}/{random_user.user.uid}/keys/impossible_key", headers=random_user.auth_headers
)
assert response.status_code == status.HTTP_404_NOT_FOUND
......@@ -124,7 +153,7 @@ class TestS3KeyRoutesCreate(_TestS3KeyRoutes):
Mock class for rgwadmin package. pytest fixture.
"""
old_s3_key = mock_rgw_admin.get_user(uid=random_user.user.uid)["keys"][0]
response = await client.post(f"{self.base_path}{random_user.user.uid}/keys", headers=random_user.auth_headers)
response = await client.post(f"{self.base_path}/{random_user.user.uid}/keys", headers=random_user.auth_headers)
new_key = response.json()
assert response.status_code == status.HTTP_201_CREATED
......@@ -133,6 +162,28 @@ class TestS3KeyRoutesCreate(_TestS3KeyRoutes):
mock_rgw_admin.remove_key(uid=random_user.user.uid, access_key=new_key["access_key"])
@pytest.mark.asyncio
async def test_create_s3_key_for_foreign_user(
self, client: AsyncClient, random_user: UserWithAuthHeader, random_second_user: UserWithAuthHeader
) -> None:
"""
Test for getting a specific S3 key from a user.
Parameters
----------
client : httpx.AsyncClient
HTTP Client to perform the request on. pytest fixture.
random_user : app.tests.utils.user.UserWithAuthHeader
Random user for testing. pytest fixture.
random_second_user : app.tests.utils.user.UserWithAuthHeader
Random second user for testing. pytest fixture.
"""
response = await client.post(
f"{self.base_path}/{random_second_user.user.uid}/keys", headers=random_user.auth_headers
)
assert response.status_code == status.HTTP_403_FORBIDDEN
class TestS3KeyRoutesDelete(_TestS3KeyRoutes):
@pytest.mark.asyncio
......@@ -154,7 +205,7 @@ class TestS3KeyRoutesDelete(_TestS3KeyRoutes):
new_s3_key = mock_rgw_admin.create_key(uid=random_user.user.uid)[-1]
assert len(mock_rgw_admin.get_user(uid=random_user.user.uid)["keys"]) == 2
response = await client.delete(
f"{self.base_path}{random_user.user.uid}/keys/{new_s3_key['access_key']}", headers=random_user.auth_headers
f"{self.base_path}/{random_user.user.uid}/keys/{new_s3_key['access_key']}", headers=random_user.auth_headers
)
assert response.status_code == status.HTTP_204_NO_CONTENT
......@@ -179,7 +230,7 @@ class TestS3KeyRoutesDelete(_TestS3KeyRoutes):
assert len(mock_rgw_admin.get_user(uid=random_user.user.uid)["keys"]) == 1
key_id = mock_rgw_admin.get_user(uid=random_user.user.uid)["keys"][0]
response = await client.delete(
f"{self.base_path}{random_user.user.uid}/keys/{key_id}", headers=random_user.auth_headers
f"{self.base_path}/{random_user.user.uid}/keys/{key_id}", headers=random_user.auth_headers
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
......@@ -203,6 +254,6 @@ class TestS3KeyRoutesDelete(_TestS3KeyRoutes):
"""
mock_rgw_admin.create_key(uid=random_user.user.uid)
response = await client.delete(
f"{self.base_path}{random_user.user.uid}/keys/impossible", headers=random_user.auth_headers
f"{self.base_path}/{random_user.user.uid}/keys/impossible", headers=random_user.auth_headers
)
assert response.status_code == status.HTTP_404_NOT_FOUND
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment