diff --git a/rwm.py b/rwm.py index 4012451fc349d5064dc42acbbc34aaeca389c337..e41fcdcedd06226e8261c5dfb29d5187bf5f7bdf 100755 --- a/rwm.py +++ b/rwm.py @@ -281,30 +281,49 @@ class StorageManager: return list(filter(lambda stmt: stmt["Action"] != ["*"], policy["Statement"])) def storage_check_policy(self, name): - """storage check bucket policy""" + """ + storage check bucket policy + + Ceph S3 API does not allow to resolve username (which is used in policy descriptor) + from access_key id, hence checks cannot directly assert if owner is or is not current + identity. + """ if not (policy := self.bucket_policy(name)): return False admin_statements = self._policy_statements_admin(policy) user_statements = self._policy_statements_user(policy) + owner_tenant, owner_username = self.bucket_owner(name).split("$") if ( # pylint: disable=too-many-boolean-expressions - # only two expected statements should be present on a bucket + # two statements MUST be present on a bucket len(policy["Statement"]) == 2 and len(admin_statements) == 1 and len(user_statements) == 1 - # with distinct identities for admin and user + # bucket owner MUST be the admin principal + and [f"arn:aws:iam::{owner_tenant}:user/{owner_username}"] == admin_statements[0]["Principal"]["AWS"] + # user MUST be another identity and admin_statements[0]["Principal"] != user_statements[0]["Principal"] - # user should have only limited access + # user MUST have only limited access and sorted(self.USER_BUCKET_POLICY_ACTIONS) == sorted(user_statements[0]["Action"]) - # the bucket should be versioned + # the bucket MUST be versioned and self.s3.Bucket(name).Versioning().status == "Enabled" ): return True return False + def storage_check_selfowned(self, name) -> bool: + """check if bucket is self-owner + + Ceph S3 API does not allow to resolve username (which is used in policy descriptor) + from access_key id, so the best guess is to check if the bucket is in the list of + buckets. + """ + + return (name in [x.name for x in self.list_buckets()]) + def storage_list(self): """storage list""" @@ -539,8 +558,11 @@ class RWM: logger.error("invalid backup selector") return 1 + if self.storage_manager.storage_check_selfowned(self.config.restic_bucket): + logger.warning("restic_bucket should not be self-owned") + if not self.storage_manager.storage_check_policy(self.config.restic_bucket): - logger.warning("used bucket does not have expected policy") + logger.warning("restic_bucket does not have expected policy") for name in selected_backups: time_start = datetime.now() diff --git a/tests/test_rwm.py b/tests/test_rwm.py index ac2165ad4cf297033e6b98c719c83b2ec9a1c4c4..ee8168a08170f9f95a756ec995928e83a6a0a202 100644 --- a/tests/test_rwm.py +++ b/tests/test_rwm.py @@ -205,8 +205,8 @@ def test_backup_error_handling(tmpworkdir: str): # pylint: disable=unused-argum } } - mock_false = Mock(return_value=False) mock_true = Mock(return_value=True) + mock_false = Mock(return_value=False) mock_ok = Mock(return_value=0) mock_fail = Mock(return_value=11) @@ -214,6 +214,7 @@ def test_backup_error_handling(tmpworkdir: str): # pylint: disable=unused-argum with ( patch.object(rwm.StorageManager, "storage_check_policy", mock_false), + patch.object(rwm.StorageManager, "storage_check_selfowned", mock_true), patch.object(rwm.RWM, "_backup_one", mock_fail), patch.object(rwm.StorageManager, "storage_save_state", mock_ok) ): @@ -221,6 +222,7 @@ def test_backup_error_handling(tmpworkdir: str): # pylint: disable=unused-argum with ( patch.object(rwm.StorageManager, "storage_check_policy", mock_true), + patch.object(rwm.StorageManager, "storage_check_selfowned", mock_false), patch.object(rwm.RWM, "_backup_one", mock_ok), patch.object(rwm.RWM, "_restic_forget_prune", mock_fail), patch.object(rwm.StorageManager, "storage_save_state", mock_ok) @@ -229,6 +231,7 @@ def test_backup_error_handling(tmpworkdir: str): # pylint: disable=unused-argum with ( patch.object(rwm.StorageManager, "storage_check_policy", mock_true), + patch.object(rwm.StorageManager, "storage_check_selfowned", mock_false), patch.object(rwm.RWM, "_backup_one", mock_ok), patch.object(rwm.RWM, "_restic_forget_prune", mock_ok), patch.object(rwm.StorageManager, "storage_save_state", mock_fail)