From 52d56b58f23df6497a9aa3bf16e1541d12e40017 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20G=C3=B6bel?= <dgoebel@techfak.uni-bielefeld.de> Date: Tue, 5 Jul 2022 08:41:07 +0200 Subject: [PATCH] Add unit tests for generating the bucket policy statements #9 --- .gitlab-ci.yml | 16 +- ProxyAPI/app/tests/unit/__init__.py | 0 .../unit/test_bucket_permission_scheme.py | 182 ++++++++++++++++++ 3 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 ProxyAPI/app/tests/unit/__init__.py create mode 100644 ProxyAPI/app/tests/unit/test_bucket_permission_scheme.py diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 2158d64..3ae4a4e 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -89,6 +89,18 @@ e2e-test-job: # This job runs in the test stage. reports: junit: $BASE_DIR/e2e-report.xml +unit-test-job: # This job runs in the test stage. + stage: test # It only starts when the job in the build stage completes successfully. + script: + - pytest --junitxml=unit-report.xml --noconftest --cov=app --cov-report=term-missing app/tests/unit + - mkdir coverage-unit + - mv .coverage coverage-unit + artifacts: + paths: + - $BASE_DIR/coverage-unit/.coverage + reports: + junit: $BASE_DIR/unit-report.xml + combine-test-coverage-job: # Combine coverage reports from different test jobs stage: test needs: @@ -96,8 +108,10 @@ combine-test-coverage-job: # Combine coverage reports from different test jobs artifacts: true - job: "integration-test-job" artifacts: true + - job: "unit-test-job" + artifacts: true script: - - coverage combine coverage-e2e/.coverage coverage-integration/.coverage + - coverage combine coverage-e2e/.coverage coverage-integration/.coverage coverage-unit - coverage report - cd .. - coverage xml --data-file=$BASE_DIR/.coverage -o coverage.xml diff --git a/ProxyAPI/app/tests/unit/__init__.py b/ProxyAPI/app/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ProxyAPI/app/tests/unit/test_bucket_permission_scheme.py b/ProxyAPI/app/tests/unit/test_bucket_permission_scheme.py new file mode 100644 index 0000000..80af26d --- /dev/null +++ b/ProxyAPI/app/tests/unit/test_bucket_permission_scheme.py @@ -0,0 +1,182 @@ +from datetime import datetime + +import pytest + +from app.models.bucket_permission import PermissionEnum +from app.schemas.bucket_permission import BucketPermission +from app.tests.utils.utils import random_lower_string + + +class _TestPermissionPolicy: + @pytest.fixture(scope="function") + def random_base_permission(self) -> BucketPermission: + """ + Generate a base READ bucket permission schema. + """ + return BucketPermission( + username=random_lower_string(), bucket_name=random_lower_string(), permission=PermissionEnum.READ + ) + + +class TestPermissionPolicyPermissionType(_TestPermissionPolicy): + def test_READ_permission(self, random_base_permission: BucketPermission) -> None: + """ + Test for converting a READ Permission into a bucket policy statement. + + Parameters + ---------- + random_base_permission : app.schemas.bucket_permission.BucketPermission + Random base bucket permission for testing. pytest fixture. + """ + uid = random_lower_string() + stmts = random_base_permission.map_to_bucket_policy_statement(user_id=uid) + assert len(stmts) == 2 + + object_stmt = stmts[0] + assert object_stmt["Effect"] == "Allow" + assert object_stmt["Sid"] == random_base_permission.to_hash(uid) + assert object_stmt["Principal"]["AWS"] == f"arn:aws:iam:::user/{random_base_permission.username}" + assert object_stmt["Resource"] == f"arn:aws:s3:::{random_base_permission.bucket_name}/*" + with pytest.raises(KeyError): + assert object_stmt["Condition"] + assert len(object_stmt["Action"]) == 1 + assert object_stmt["Action"][0] == "s3:GetObject" + + bucket_stmt = stmts[1] + assert bucket_stmt["Sid"] == random_base_permission.to_hash(uid) + assert bucket_stmt["Effect"] == "Allow" + assert bucket_stmt["Principal"]["AWS"] == f"arn:aws:iam:::user/{random_base_permission.username}" + assert bucket_stmt["Resource"] == f"arn:aws:s3:::{random_base_permission.bucket_name}" + with pytest.raises(KeyError): + assert bucket_stmt["Condition"] + assert len(bucket_stmt["Action"]) == 1 + assert bucket_stmt["Action"][0] == "s3:ListBucket" + + def test_WRITE_permission(self, random_base_permission: BucketPermission) -> None: + """ + Test for converting a WRITE Permission into a bucket policy statement. + + Parameters + ---------- + random_base_permission : app.schemas.bucket_permission.BucketPermission + Random base bucket permission for testing. pytest fixture. + """ + random_base_permission.permission = PermissionEnum.WRITE + stmts = random_base_permission.map_to_bucket_policy_statement(user_id=random_lower_string()) + assert len(stmts) == 1 + + object_stmt = stmts[0] + with pytest.raises(KeyError): + assert object_stmt["Condition"] + assert len(object_stmt["Action"]) == 2 + assert "s3:PutObject" in object_stmt["Action"] + assert "s3:DeleteObject" in object_stmt["Action"] + + def test_READWRITE_permission(self, random_base_permission: BucketPermission) -> None: + """ + Test for converting a READWRITE Permission into a bucket policy statement. + + Parameters + ---------- + random_base_permission : app.schemas.bucket_permission.BucketPermission + Random base bucket permission for testing. pytest fixture. + """ + random_base_permission.permission = PermissionEnum.READWRITE + stmts = random_base_permission.map_to_bucket_policy_statement(user_id=random_lower_string()) + assert len(stmts) == 2 + + object_stmt = stmts[0] + with pytest.raises(KeyError): + assert object_stmt["Condition"] + assert len(object_stmt["Action"]) == 3 + assert "s3:PutObject" in object_stmt["Action"] + assert "s3:DeleteObject" in object_stmt["Action"] + assert "s3:GetObject" in object_stmt["Action"] + + bucket_stmt = stmts[1] + with pytest.raises(KeyError): + assert bucket_stmt["Condition"] + assert len(bucket_stmt["Action"]) == 1 + assert bucket_stmt["Action"][0] == "s3:ListBucket" + + +class TestPermissionPolicyCondition(_TestPermissionPolicy): + def test_to_timestamp_condition(self, random_base_permission: BucketPermission) -> None: + """ + Test for converting a READ Permission with end time condition into a bucket policy statement. + + Parameters + ---------- + random_base_permission : app.schemas.bucket_permission.BucketPermission + Random base bucket permission for testing. pytest fixture. + """ + time = datetime.now() + random_base_permission.to_timestamp = time + + stmts = random_base_permission.map_to_bucket_policy_statement(user_id=random_lower_string()) + assert len(stmts) == 2 + + object_stmt = stmts[0] + assert object_stmt["Condition"] + assert object_stmt["Condition"]["DateLessThan"]["aws:CurrentTime"] == time.strftime("%Y-%m-%dT%H:%M:%SZ") + with pytest.raises(KeyError): + assert object_stmt["Condition"]["DateGreaterThan"] + + bucket_stmt = stmts[1] + assert bucket_stmt["Condition"] + assert bucket_stmt["Condition"]["DateLessThan"]["aws:CurrentTime"] == time.strftime("%Y-%m-%dT%H:%M:%SZ") + with pytest.raises(KeyError): + assert bucket_stmt["Condition"]["DateGreaterThan"] + + def test_from_timestamp_condition(self, random_base_permission: BucketPermission) -> None: + """ + Test for converting a READ Permission with start time condition into a bucket policy statement. + + Parameters + ---------- + random_base_permission : app.schemas.bucket_permission.BucketPermission + Random base bucket permission for testing. pytest fixture. + """ + time = datetime.now() + random_base_permission.from_timestamp = time + + stmts = random_base_permission.map_to_bucket_policy_statement(user_id=random_lower_string()) + assert len(stmts) == 2 + + object_stmt = stmts[0] + assert object_stmt["Condition"] + assert object_stmt["Condition"]["DateGreaterThan"]["aws:CurrentTime"] == time.strftime("%Y-%m-%dT%H:%M:%SZ") + with pytest.raises(KeyError): + assert object_stmt["Condition"]["DateLessThan"] + + bucket_stmt = stmts[1] + assert bucket_stmt["Condition"] + assert bucket_stmt["Condition"]["DateGreaterThan"]["aws:CurrentTime"] == time.strftime("%Y-%m-%dT%H:%M:%SZ") + with pytest.raises(KeyError): + assert bucket_stmt["Condition"]["DateLessThan"] + + def test_file_prefix_condition(self, random_base_permission: BucketPermission) -> None: + """ + Test for converting a READ Permission with file prefix condition into a bucket policy statement. + + Parameters + ---------- + random_base_permission : app.schemas.bucket_permission.BucketPermission + Random base bucket permission for testing. pytest fixture. + """ + random_base_permission.file_prefix = random_lower_string(length=8) + "/" + random_lower_string(length=8) + "/" + + stmts = random_base_permission.map_to_bucket_policy_statement(user_id=random_lower_string()) + assert len(stmts) == 2 + + object_stmt = stmts[0] + assert ( + object_stmt["Resource"] + == f"arn:aws:s3:::{random_base_permission.bucket_name}/{random_base_permission.file_prefix}*" + ) + with pytest.raises(KeyError): + assert object_stmt["Condition"] + + bucket_stmt = stmts[1] + assert bucket_stmt["Condition"] + assert bucket_stmt["Condition"]["StringLike"]["s3:prefix"] == random_base_permission.file_prefix + "*" -- GitLab