diff --git a/Makefile b/Makefile index fcee2a42cc1a6dcea744ae52def9b9db7a547581..03ca01a1cef3536eb76c0e06c28cf64ab849b104 100644 --- a/Makefile +++ b/Makefile @@ -22,6 +22,7 @@ lint-yaml: yamllint --strict . test: + # show stderr with "-o log_cli=true" python3 -m pytest -v tests/ coverage: diff --git a/README.md b/README.md index 5b0a1eb9f9d042a8efdcf9689cbd6fcb3da22248..798e66ec2d2fbd4dbd250f37e1114197acb4215a 100644 --- a/README.md +++ b/README.md @@ -105,10 +105,29 @@ rwm restic mount /mnt/restore ``` +### RWM: backups with policed buckets + +Have two S3 accounts (*admin* and *user1*), create storage bucket and use it. + +``` +cp examples/rwm-admin.conf admin.conf +rwm --confg admin.conf create_storage bucket1 user1 +rwm --confg admin.conf storage_check_policy bucket1 +rwm --confg admin.conf storage_list + +cp examples/rwm-backups.conf rwm.conf +rwm storage_check_policy bucket1 +rwm backup_all +rwm restic snapshots +rwm restic mount /mnt/restore +``` + + ## Notes * executed tools stdout is buffered, eg. `restic mount` does not print immediate output as normal * passthrough full arguments to underlying tool with "--" (eg. `rwm rclone -- ls --help`). +* runner microceph breaks on reboot because of symlink at /etc/ceph ## Development @@ -131,4 +150,12 @@ cd /opt/rwm export RUNNER_URL= export RUNNER_TOKEN= make runner -``` \ No newline at end of file +``` + + +## References + +* https://restic.readthedocs.io/ +* https://github.com/CESNET/aws-plugin-bucket-policy +* https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lock.html +* https://aws.amazon.com/blogs/storage/point-in-time-restore-for-amazon-s3-buckets/ diff --git a/rwm.py b/rwm.py index ec9cb9c4e7917f796a0cde0a4dcb3bae634dd14f..6bc6e010301b4b2a33cc959457536122346e46ed 100755 --- a/rwm.py +++ b/rwm.py @@ -3,6 +3,7 @@ import base64 import dataclasses +import json import logging import os import shlex @@ -12,6 +13,8 @@ from argparse import ArgumentParser from datetime import datetime from pathlib import Path +import boto3 +import botocore import yaml from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend @@ -99,13 +102,121 @@ class BackupResult: } +class StorageManager: + """s3 policed bucket manager""" + + def __init__(self, url, access_key, secret_key): + self.url = url + self.access_key = access_key + self.secret_key = secret_key + self.s3 = boto3.resource('s3', endpoint_url=url, aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key) + + def create_bucket(self, name): + """aws s3 resource api stub""" + # boto3 client and resource api are not completely aligned + # s3.Bucket("xyz").create() returns dict instead of s3.Bucket object + return self.s3.create_bucket(Bucket=name) + + def bucket_exist(self, name): + """check if bucket exist""" + return name in [x.name for x in self.list_buckets()] + + def bucket_owner(self, name): + """aws s3 resource api stub""" + return self.s3.Bucket(name).Acl().owner["ID"] + + def bucket_policy(self, name): + """aws s3 resource api stub""" + + try: + return json.loads(self.s3.Bucket(name).Policy().policy) + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as exc: + logger.error("rwm bucket_policy error, %s", (exc)) + return None + + def list_buckets(self): + """aws s3 resource api stub""" + return list(self.s3.buckets.all()) + + def list_objects(self, bucket_name): + """aws s3 resource api stub""" + return list(self.s3.Bucket(bucket_name).objects.all()) + + def storage_create(self, bucket_name, target_username): + """create policed bucket""" + + if (not bucket_name) or (not target_username): + raise ValueError("must specify value for bucket and user") + + bucket = self.create_bucket(bucket_name) + tenant, manager_username = bucket.Acl().owner["ID"].split("$") + + # grants basic RW access to user in same tenant + bucket_policy = { + "Version": "2012-10-17", + "Statement": [ + # full access to manager + { + "Effect": "Allow", + "Principal": {"AWS": [f"arn:aws:iam::{tenant}:user/{manager_username}"]}, + "Action": ["*"], + "Resource": [f"arn:aws:s3:::{bucket.name}", f"arn:aws:s3:::{bucket.name}/*"] + }, + # limited access to user + { + "Effect": "Allow", + "Principal": {"AWS": [f"arn:aws:iam::{tenant}:user/{target_username}"]}, + "Action": [ + "s3:ListBucket", "s3:GetObject", "s3:PutObject", "s3:DeleteObject", + "s3:GetBucketPolicy", "s3:ListBucketVersions", "s3:GetBucketVersioning" + ], + "Resource": [f"arn:aws:s3:::{bucket.name}", f"arn:aws:s3:::{bucket.name}/*"] + } + ] + } + bucket.Policy().put(Policy=json.dumps(bucket_policy)) + + # enforces versioning + bucket.Versioning().enable() + + return bucket + + def storage_delete(self, bucket_name): + """storage delete""" + + bucket = self.s3.Bucket(bucket_name) + bucket.objects.all().delete() + bucket.object_versions.all().delete() + bucket.delete() + + def storage_check_policy(self, name): + """storage check bucket policy""" + + if not (policy := self.bucket_policy(name)): + return False + + if ( + len(policy["Statement"]) == 2 + and len(list(filter(lambda stmt: stmt["Action"] == ["*"], policy["Statement"]))) == 1 + and self.s3.Bucket(name).Versioning().status == "Enabled" + ): + return True + + return False + + class RWM: """rwm impl""" def __init__(self, config): self.config = config + self.storage_manager = StorageManager( + config.get("rwm_s3_endpoint_url"), + config.get("rwm_s3_access_key"), + config.get("rwm_s3_secret_key") + ) - def aws_cmd(self, args): + def aws_cmd(self, args) -> subprocess.CompletedProcess: """aws cli wrapper""" env = { @@ -121,7 +232,7 @@ class RWM: # aws cli does not have endpoint-url as env config option return run_command(["aws", "--endpoint-url", self.config["rwm_s3_endpoint_url"]] + args, env=env) - def rclone_cmd(self, args): + def rclone_cmd(self, args) -> subprocess.CompletedProcess: """rclone wrapper""" env = { @@ -136,7 +247,7 @@ class RWM: } return run_command(["rclone"] + args, env=env) - def rclone_crypt_cmd(self, args): + def rclone_crypt_cmd(self, args) -> subprocess.CompletedProcess: """ rclone crypt wrapper * https://rclone.org/docs/#config-file @@ -160,7 +271,7 @@ class RWM: } return run_command(["rclone"] + args, env=env) - def restic_cmd(self, args): + def restic_cmd(self, args) -> subprocess.CompletedProcess: """restic command wrapper""" env = { @@ -173,7 +284,7 @@ class RWM: } return run_command(["restic"] + args, env=env) - def restic_autoinit(self): + def restic_autoinit(self) -> subprocess.CompletedProcess: """runs restic init""" logger.info("run restic_autoinit") @@ -181,7 +292,7 @@ class RWM: proc = self.restic_cmd(["init"]) return proc - def restic_backup(self, name): + def restic_backup(self, name) -> subprocess.CompletedProcess: """runs restic backup by name""" logger.info(f"run restic_backup {name}") @@ -194,7 +305,7 @@ class RWM: return self.restic_cmd(cmd_args) - def restic_forget_prune(self): + def restic_forget_prune(self) -> subprocess.CompletedProcess: """runs forget prune""" logger.info("run restic_forget_prune") @@ -205,9 +316,12 @@ class RWM: return self.restic_cmd(cmd_args) - def backup_cmd(self, name): + def backup_cmd(self, name) -> subprocess.CompletedProcess: """backup command""" + # TODO: check target backup policy, restic automatically creates + # bucket if ot does not exist with null-policy + autoinit_proc = self.restic_autoinit() if autoinit_proc.returncode != 0: logger.error("restic autoinit failed") @@ -226,7 +340,7 @@ class RWM: return backup_proc - def backup_all_cmd(self): + def backup_all_cmd(self) -> int: """backup all command""" stats = {} @@ -259,6 +373,49 @@ class RWM: print(tabulate([item.to_dict() for item in stats.values()], headers="keys", numalign="left")) return ret + def storage_create_cmd(self, bucket_name, target_username) -> int: + """storage create command""" + + try: + self.storage_manager.storage_create(bucket_name, target_username) + except ( + botocore.exceptions.ClientError, + botocore.exceptions.BotoCoreError, + ValueError + ) as exc: + logger.error("rwm storage_create error, %s", (exc)) + return 1 + return 0 + + def storage_delete_cmd(self, bucket_name) -> int: + """storage delete command""" + + try: + self.storage_manager.storage_delete(bucket_name) + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as exc: + logger.error("rwm storage_delete error, %s", (exc)) + return 1 + return 0 + + def storage_check_policy_cmd(self, bucket_name) -> int: + """storage check policy command""" + + ret, msg = ( + (0, "OK") + if self.storage_manager.storage_check_policy(bucket_name) == True + else (1, "FAILED") + ) + logger.debug("bucket policy: %s", json.dumps(self.storage_manager.bucket_policy(bucket_name), indent=4)) + print(msg) + return ret + + def storage_list_cmd(self): + pass + + def storage_restore(self, bucket_name, target_username): + """https://gitlab.cesnet.cz/709/public/restic/aws/-/blob/main/bucket_copy.sh?ref_type=heads""" + pass + def configure_logging(debug): """configure logger""" @@ -297,6 +454,14 @@ def parse_arguments(argv): backup_cmd_parser.add_argument("name", help="backup config name") subparsers.add_parser("backup_all", help="backup all command") + storage_create_cmd_parser = subparsers.add_parser("storage_create", help="storage_create command") + storage_create_cmd_parser.add_argument("bucket_name", help="bucket name") + storage_create_cmd_parser.add_argument("target_username", help="actual bucket user with limited RW access") + storage_delete_cmd_parser = subparsers.add_parser("storage_delete", help="storage_delete command") + storage_delete_cmd_parser.add_argument("bucket_name", help="bucket name") + storage_check_policy_cmd_parser = subparsers.add_parser("storage_check_policy", help="storage_check_policy command; use --debug to show policy") + storage_check_policy_cmd_parser.add_argument("bucket_name", help="bucket name") + return parser.parse_args(argv) @@ -333,7 +498,14 @@ def main(argv=None): if args.command == "backup_all": ret = rwmi.backup_all_cmd() logger.info("rwm backup_all finished with %s (ret %d)", "success" if ret == 0 else "errors", ret) - + if args.command == "storage_create": + ret = rwmi.storage_create_cmd(args.bucket_name, args.target_username) + if args.command == "storage_delete": + ret = rwmi.storage_delete_cmd(args.bucket_name) + if args.command == "storage_check_policy": + ret = rwmi.storage_check_policy_cmd(args.bucket_name) + + logger.debug("rwm finished with %s (ret %d)", "success" if ret == 0 else "errors", ret) return ret diff --git a/tests/conftest.py b/tests/conftest.py index 32e709475069970710e514a6bf3ab836d4f53153..17ba890fe546b54ab4731be6e6844f62f9a68844 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,10 +7,11 @@ import socket import subprocess from tempfile import mkdtemp -import boto3 import pytest from xprocess import ProcessStarter +from rwm import StorageManager + @pytest.fixture def tmpworkdir(): @@ -54,42 +55,44 @@ def microceph(): yield "http://localhost:80" -def rgwuser(microceph_url, name): +def radosuser(microceph_url, username, tenant="tenant1"): """rgwuser fixture""" subprocess.run( - ["/snap/bin/radosgw-admin", "user", "rm", f"--uid={name}", "--purge-data"], + ["/snap/bin/radosgw-admin", "user", "rm", f"--uid={tenant}${username}", "--purge-data"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False ) proc = subprocess.run( - ["/snap/bin/radosgw-admin", "user", "create", f"--uid={name}", f"--display-name=rwguser_{name}"], + ["/snap/bin/radosgw-admin", "user", "create", f"--uid={tenant}${username}", f"--display-name={tenant}_{username}"], check=True, capture_output=True, text=True, ) user = json.loads(proc.stdout) - yield boto3.resource( - 's3', - endpoint_url=microceph_url, - aws_access_key_id=user["keys"][0]["access_key"], - aws_secret_access_key=user["keys"][0]["secret_key"] - ) + yield StorageManager(microceph_url, user["keys"][0]["access_key"], user["keys"][0]["secret_key"]) + + subprocess.run(["/snap/bin/radosgw-admin", "user", "rm", f"--uid={tenant}${username}", "--purge-data"], check=True) + + +@pytest.fixture +def radosuser_admin(microceph): # pylint: disable=redefined-outer-name + """radosuser admin stub""" - subprocess.run(["/snap/bin/radosgw-admin", "user", "rm", f"--uid={name}", "--purge-data"], check=True) + yield from radosuser(microceph, "admin") @pytest.fixture -def rgwuser_test1(microceph): # pylint: disable=redefined-outer-name, unused-argument - """rgwuser test1 stub""" +def radosuser_test1(microceph): # pylint: disable=redefined-outer-name + """radosuser test1 stub""" - yield from rgwuser(microceph, "test1") + yield from radosuser(microceph, "test1") @pytest.fixture -def rgwuser_test2(microceph): # pylint: disable=redefined-outer-name, unused-argument - """rgwuser test2 stub""" +def radosuser_test2(microceph): # pylint: disable=redefined-outer-name + """radosuser test2 stub""" - yield from rgwuser(microceph, "test2") + yield from radosuser(microceph, "test2") diff --git a/tests/test_default.py b/tests/test_default.py index 23e3c9085f2a4e84f5b20ca8aaea9dd1ca44c236..f04dd224d1298dafa1f71e18afa7ea9bceda453e 100644 --- a/tests/test_default.py +++ b/tests/test_default.py @@ -39,7 +39,7 @@ def test_wrap_output(): def test_main(tmpworkdir: str): # pylint: disable=unused-argument """test main""" - # optional and default config hanling + # optional and default config handling assert rwm_main(["version"]) == 0 Path("rwm.conf").touch() assert rwm_main(["version"]) == 0 @@ -51,9 +51,17 @@ def test_main(tmpworkdir: str): # pylint: disable=unused-argument assert rwm_main([item, "dummy"]) == 0 mock = Mock(return_value=0) + with patch.object(rwm.RWM, "backup_all_cmd", mock): assert rwm_main(["backup_all"]) == 0 + with patch.object(rwm.RWM, "storage_create_cmd", mock): + assert rwm_main(["storage_create", "bucket", "user"]) == 0 + + for item in ["storage_delete", "storage_check_policy"]: + with patch.object(rwm.RWM, f"{item}_cmd", mock): + assert rwm_main([item, "bucket"]) == 0 + def test_aws_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument """test aws command""" @@ -243,7 +251,7 @@ def test_backup_cmd_excludes(tmpworkdir: str, motoserver: str): # pylint: disab assert "/testdatadir/proc/to_be_also_excluded" not in snapshot_files assert "/testdatadir/processor" in snapshot_files assert "/testdatadir/some_other_proc_essor" in snapshot_files - # following expected result does not work , because test config uses root-unanchored exclude path "proc/*" + # following expected result does not work, because test config uses root-unanchored exclude path "proc/*" # assert "/testdatadir/var/proc/data" in snapshot_files @@ -305,3 +313,67 @@ def test_backup_all_cmd_error_handling(tmpworkdir: str): # pylint: disable=unus with patch.object(rwm.RWM, "restic_autoinit", mock_fail): assert RWM(rwm_conf).backup_all_cmd() == 11 + + +def test_storage_create_cmd(tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager): # pylint: disable=unused-argument + """test_storage_create_cmd""" + + trwm = rwm.RWM({ + "rwm_s3_endpoint_url": radosuser_admin.url, + "rwm_s3_access_key": radosuser_admin.access_key, + "rwm_s3_secret_key": radosuser_admin.secret_key, + }) + + bucket_name = "testbuck" + assert trwm.storage_create_cmd(bucket_name, "testnx") == 0 + assert trwm.storage_create_cmd("!invalid", "testnx") == 1 + assert trwm.storage_create_cmd("", "testnx") == 1 + + +def test_storage_delete_cmd(tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager): # pylint: disable=unused-argument + """test_storage_create_cmd""" + + trwm = rwm.RWM({ + "rwm_s3_endpoint_url": radosuser_admin.url, + "rwm_s3_access_key": radosuser_admin.access_key, + "rwm_s3_secret_key": radosuser_admin.secret_key, + + "rwm_restic_bucket": "testbuck", + "rwm_restic_password": "dummydummydummydummy", + "rwm_backups": { + "testcfg": {"filesdirs": ["testdatadir/"]} + } + }) + + bucket_name = trwm.config["rwm_restic_bucket"] + Path("testdatadir").mkdir() + Path("testdatadir/testdata1.txt").write_text("dummydata", encoding="utf-8") + + bucket = trwm.storage_manager.storage_create(bucket_name, "admin") + assert trwm.storage_manager.bucket_exist(bucket_name) + assert len(trwm.storage_manager.list_objects(bucket_name)) == 0 + + assert trwm.backup_cmd("testcfg").returncode == 0 + assert len(trwm.storage_manager.list_objects(bucket_name)) != 0 + + object_versions = radosuser_admin.s3.meta.client.list_object_versions(Bucket=bucket.name) + assert len(object_versions["Versions"]) > 0 + assert len(object_versions["DeleteMarkers"]) > 0 + + assert trwm.storage_delete_cmd(bucket_name) == 0 + assert not trwm.storage_manager.bucket_exist(bucket_name) + assert trwm.storage_delete_cmd(bucket_name) == 1 + + +def test_storage_check_policy_cmd(tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager): # pylint: disable=unused-argument + """test storage check policy command""" + + trwm = rwm.RWM({ + "rwm_s3_endpoint_url": radosuser_admin.url, + "rwm_s3_access_key": radosuser_admin.access_key, + "rwm_s3_secret_key": radosuser_admin.secret_key, + }) + + mock = Mock(return_value=False) + with patch.object(rwm.StorageManager, "storage_check_policy", mock): + assert trwm.storage_check_policy_cmd("dummy") == 1 diff --git a/tests/test_policies.py b/tests/test_policies.py index acd19db3f7d4ea38727ceca66287d8cefc5def2d..9f9af5799b2e5f60b9fe8ce76598d4de7c0a7eba 100644 --- a/tests/test_policies.py +++ b/tests/test_policies.py @@ -1,33 +1,143 @@ """rwm bucket policies tests""" -import boto3 +import json +from io import BytesIO +from pathlib import Path + import pytest +import rwm + def test_microceph_defaults( tmpworkdir: str, microceph: str, - rgwuser_test1: boto3.resource, - rgwuser_test2: boto3.resource + radosuser_test1: rwm.StorageManager, + radosuser_test2: rwm.StorageManager ): # pylint: disable=unused-argument """test microceph defaults""" - # bucket should not be present - test_bucket = "testbuckx" - assert test_bucket not in [x.name for x in rgwuser_test1.buckets.all()] - - # create bucket - rgwuser_test1.create_bucket(Bucket=test_bucket) - assert test_bucket in [x.name for x in rgwuser_test1.buckets.all()] - - # list from other identity, check it is not visible - assert test_bucket not in [x.name for x in rgwuser_test2.buckets.all()] - # but already exist - with pytest.raises(rgwuser_test2.meta.client.exceptions.BucketAlreadyExists): - rgwuser_test2.create_bucket(Bucket=test_bucket) - - # belongs to expected user - assert rgwuser_test1.Bucket(test_bucket).Acl().owner["ID"] == "test1" - # but unaccessible by other user - with pytest.raises(rgwuser_test2.meta.client.exceptions.ClientError, match=r"AccessDenied"): - assert rgwuser_test2.Bucket(test_bucket).Acl().owner["ID"] == "test1" + bucket_name = "testbuckx" + + # create bucket, check owner and default policy + assert bucket_name not in [x.name for x in radosuser_test1.list_buckets()] + radosuser_test1.create_bucket(bucket_name) + + assert bucket_name in [x.name for x in radosuser_test1.list_buckets()] + assert radosuser_test1.bucket_owner(bucket_name).endswith("$test1") + assert not radosuser_test1.bucket_policy(bucket_name) + + # bucket must exist, but not be not visible nor accessible to others + with pytest.raises(radosuser_test2.s3.meta.client.exceptions.BucketAlreadyExists): + radosuser_test2.create_bucket(bucket_name) + assert bucket_name not in [x.name for x in radosuser_test2.list_buckets()] + with pytest.raises(radosuser_test2.s3.meta.client.exceptions.ClientError, match=r"AccessDenied"): + assert radosuser_test2.list_objects(bucket_name) + + +def test_storage_policy( + tmpworkdir: str, + microceph: str, + radosuser_admin: rwm.StorageManager, + radosuser_test1: rwm.StorageManager, + radosuser_test2: rwm.StorageManager +): # pylint: disable=unused-argument + """test manager created bucket policy""" + + bucket = radosuser_admin.storage_create("testbuckx", "test1") + + assert radosuser_admin.list_objects(bucket.name) == [] + assert radosuser_test1.list_objects(bucket.name) == [] + assert radosuser_admin.bucket_policy(bucket.name) + assert radosuser_test1.bucket_policy(bucket.name) + + with pytest.raises(radosuser_test2.s3.meta.client.exceptions.ClientError, match=r"AccessDenied"): + radosuser_test2.list_objects(bucket.name) + + assert bucket.Versioning().status == "Enabled" + + +def test_storage_versioning( + tmpworkdir: str, + microceph: str, + radosuser_admin: rwm.StorageManager, + radosuser_test1: rwm.StorageManager, +): # pylint: disable=unused-argument + """test manager created bucket policy""" + + bucket_name = "testbuckx" + target_username = "test1" + + bucket = radosuser_admin.storage_create(bucket_name, target_username) + assert bucket.Versioning().status == "Enabled" + + bucket = radosuser_test1.s3.Bucket(bucket_name) + bucket.upload_fileobj(BytesIO(b"dummydata"), "dummykey") + assert len(radosuser_test1.list_objects(bucket_name)) == 1 + bucket.Object("dummykey").delete() + assert len(radosuser_test1.list_objects(bucket_name)) == 0 + + # there should be object and it's delete marker + object_versions = list(bucket.object_versions.all()) + assert len(object_versions) == 2 + + # boto3 resource api does not have working marker attribute + # https://github.com/boto/botocore/issues/674 + # https://github.com/boto/boto3/issues/1769 + # print(radosuser_test1.s3.meta.client.list_object_versions(Bucket=bucket_name)) + object_versions = radosuser_test1.s3.meta.client.list_object_versions(Bucket=bucket.name) + assert len(object_versions["Versions"]) == 1 + assert len(object_versions["DeleteMarkers"]) == 1 + + +def test_storage_backup( + tmpworkdir: str, + microceph: str, + radosuser_admin: rwm.StorageManager, + radosuser_test1: rwm.StorageManager, +): # pylint: disable=unused-argument + """test backup to manager created bucket with policy""" + + bucket_name = "rwmbackup-test1" + target_username = "test1" + + radosuser_admin.storage_create(bucket_name, target_username) + Path("testdir").mkdir() + Path("testdir/testdata").write_text('dummy', encoding="utf-8") + + trwm = rwm.RWM({ + "rwm_s3_endpoint_url": radosuser_test1.url, + "rwm_s3_access_key": radosuser_test1.access_key, + "rwm_s3_secret_key": radosuser_test1.secret_key, + "rwm_restic_bucket": bucket_name, + "rwm_restic_password": "dummydummydummydummy", + "rwm_backups": { + "dummy": {"filesdirs": ["testdir"]} + } + }) + assert trwm.backup_cmd("dummy").returncode == 0 + + assert radosuser_test1.list_objects(bucket_name) + assert len(json.loads(trwm.restic_cmd(["snapshots", "--json"]).stdout)) == 1 + + +def test_storage_check_policy( + tmpworkdir: str, + microceph: str, + radosuser_admin: rwm.StorageManager, + radosuser_test1: rwm.StorageManager +): # pylint: disable=unused-argument + """test backup to manager created bucket with policy""" + + bucket_name = "rwmbackup-test1" + target_username = "test1" + + assert radosuser_admin.create_bucket(bucket_name) + assert not radosuser_admin.storage_check_policy(bucket_name) + radosuser_admin.storage_delete(bucket_name) + + radosuser_admin.storage_create(bucket_name, "test1") + assert radosuser_test1.storage_check_policy(bucket_name) + + radosuser_admin.s3.Bucket(bucket_name).Versioning().suspend() + assert not radosuser_test1.storage_check_policy(bucket_name)