Skip to content
Snippets Groups Projects
Commit fe78e8c3 authored by Radoslav Bodó's avatar Radoslav Bodó
Browse files

rwm: add more explicit bucket policy and ownership checking when performing rwm backups

parent dcb63370
No related branches found
No related tags found
No related merge requests found
Pipeline #7708 failed
......@@ -281,30 +281,49 @@ class StorageManager:
return list(filter(lambda stmt: stmt["Action"] != ["*"], policy["Statement"]))
def storage_check_policy(self, name):
"""storage check bucket policy"""
"""
storage check bucket policy
Ceph S3 API does not allow to resolve username (which is used in policy descriptor)
from access_key id, hence checks cannot directly assert if owner is or is not current
identity.
"""
if not (policy := self.bucket_policy(name)):
return False
admin_statements = self._policy_statements_admin(policy)
user_statements = self._policy_statements_user(policy)
owner_tenant, owner_username = self.bucket_owner(name).split("$")
if ( # pylint: disable=too-many-boolean-expressions
# only two expected statements should be present on a bucket
# two statements MUST be present on a bucket
len(policy["Statement"]) == 2
and len(admin_statements) == 1
and len(user_statements) == 1
# with distinct identities for admin and user
# bucket owner MUST be the admin principal
and [f"arn:aws:iam::{owner_tenant}:user/{owner_username}"] == admin_statements[0]["Principal"]["AWS"]
# user MUST be another identity
and admin_statements[0]["Principal"] != user_statements[0]["Principal"]
# user should have only limited access
# user MUST have only limited access
and sorted(self.USER_BUCKET_POLICY_ACTIONS) == sorted(user_statements[0]["Action"])
# the bucket should be versioned
# the bucket MUST be versioned
and self.s3.Bucket(name).Versioning().status == "Enabled"
):
return True
return False
def storage_check_selfowned(self, name) -> bool:
"""check if bucket is self-owner
Ceph S3 API does not allow to resolve username (which is used in policy descriptor)
from access_key id, so the best guess is to check if the bucket is in the list of
buckets.
"""
return (name in [x.name for x in self.list_buckets()])
def storage_list(self):
"""storage list"""
......@@ -539,8 +558,11 @@ class RWM:
logger.error("invalid backup selector")
return 1
if self.storage_manager.storage_check_selfowned(self.config.restic_bucket):
logger.warning("restic_bucket should not be self-owned")
if not self.storage_manager.storage_check_policy(self.config.restic_bucket):
logger.warning("used bucket does not have expected policy")
logger.warning("restic_bucket does not have expected policy")
for name in selected_backups:
time_start = datetime.now()
......
......@@ -205,8 +205,8 @@ def test_backup_error_handling(tmpworkdir: str): # pylint: disable=unused-argum
}
}
mock_false = Mock(return_value=False)
mock_true = Mock(return_value=True)
mock_false = Mock(return_value=False)
mock_ok = Mock(return_value=0)
mock_fail = Mock(return_value=11)
......@@ -214,6 +214,7 @@ def test_backup_error_handling(tmpworkdir: str): # pylint: disable=unused-argum
with (
patch.object(rwm.StorageManager, "storage_check_policy", mock_false),
patch.object(rwm.StorageManager, "storage_check_selfowned", mock_true),
patch.object(rwm.RWM, "_backup_one", mock_fail),
patch.object(rwm.StorageManager, "storage_save_state", mock_ok)
):
......@@ -221,6 +222,7 @@ def test_backup_error_handling(tmpworkdir: str): # pylint: disable=unused-argum
with (
patch.object(rwm.StorageManager, "storage_check_policy", mock_true),
patch.object(rwm.StorageManager, "storage_check_selfowned", mock_false),
patch.object(rwm.RWM, "_backup_one", mock_ok),
patch.object(rwm.RWM, "_restic_forget_prune", mock_fail),
patch.object(rwm.StorageManager, "storage_save_state", mock_ok)
......@@ -229,6 +231,7 @@ def test_backup_error_handling(tmpworkdir: str): # pylint: disable=unused-argum
with (
patch.object(rwm.StorageManager, "storage_check_policy", mock_true),
patch.object(rwm.StorageManager, "storage_check_selfowned", mock_false),
patch.object(rwm.RWM, "_backup_one", mock_ok),
patch.object(rwm.RWM, "_restic_forget_prune", mock_ok),
patch.object(rwm.StorageManager, "storage_save_state", mock_fail)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment