"""rwm storagemanager and bucket policing tests""" import json from io import BytesIO from pathlib import Path import pytest import rwm def test_microceph_defaults( tmpworkdir: str, microceph: str, radosuser_test1: rwm.StorageManager, radosuser_test2: rwm.StorageManager ): # pylint: disable=unused-argument """test microceph defaults""" bucket_name = "testbuckx" # create bucket assert not radosuser_test1.bucket_exist(bucket_name) radosuser_test1.bucket_create(bucket_name) assert len(radosuser_test1.list_buckets()) == 1 assert radosuser_test1.list_objects(bucket_name) == [] # assert basic raw bucket behavior assert radosuser_test1.bucket_exist(bucket_name) assert radosuser_test1.bucket_owner(bucket_name).endswith("$test1") assert not radosuser_test1.bucket_policy(bucket_name) # bucket must exist, but not be accessible to others with pytest.raises(radosuser_test2.s3.meta.client.exceptions.BucketAlreadyExists): radosuser_test2.bucket_create(bucket_name) with pytest.raises(radosuser_test2.s3.meta.client.exceptions.ClientError, match=r"AccessDenied"): assert radosuser_test2.list_objects(bucket_name) def test_storage_create( tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager, radosuser_test1: rwm.StorageManager, radosuser_test2: rwm.StorageManager ): # pylint: disable=unused-argument """test manager storage_create""" bucket = radosuser_admin.storage_create("testbuckx", "test1") assert radosuser_admin.list_objects(bucket.name) == [] assert radosuser_admin.storage_check_policy(bucket.name) assert radosuser_test1.storage_check_policy(bucket.name) # storage must exist, but not be accessible to others with pytest.raises(radosuser_test2.s3.meta.client.exceptions.ClientError, match=r"AccessDenied"): radosuser_test2.list_objects(bucket.name) def test_storage_delete( tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager, radosuser_test1: rwm.StorageManager, ): # pylint: disable=unused-argument """test manager storage_delete""" bucket_name = "testbuckx" target_username = "test1" bucket = radosuser_admin.storage_create(bucket_name, target_username) bucket = radosuser_test1.s3.Bucket(bucket.name) bucket.upload_fileobj(BytesIO(b"dummydata"), "dummykey") assert len(radosuser_test1.list_objects(bucket.name)) == 1 bucket.Object("dummykey").delete() assert len(radosuser_test1.list_objects(bucket.name)) == 0 # there should be object and it's delete marker object_versions = list(bucket.object_versions.all()) assert len(object_versions) == 2 # boto3 resource api does not have working marker attribute # https://github.com/boto/botocore/issues/674 # https://github.com/boto/boto3/issues/1769 # print(radosuser_test1.s3.meta.client.list_object_versions(Bucket=bucket_name)) object_versions = radosuser_test1.s3.meta.client.list_object_versions(Bucket=bucket.name) assert len(object_versions["Versions"]) == 1 assert len(object_versions["DeleteMarkers"]) == 1 def test_storage_check_policy( tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager, radosuser_test1: rwm.StorageManager ): # pylint: disable=unused-argument """test manager storage_check_policy""" bucket_name = "rwmbackup-test1" target_username = "test1" assert radosuser_admin.bucket_create(bucket_name) assert not radosuser_admin.storage_check_policy(bucket_name) radosuser_admin.storage_delete(bucket_name) radosuser_admin.storage_create(bucket_name, target_username) assert radosuser_test1.storage_check_policy(bucket_name) radosuser_admin.s3.Bucket(bucket_name).Versioning().suspend() assert not radosuser_test1.storage_check_policy(bucket_name) def test_storage_backup_usage( tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager, radosuser_test1: rwm.StorageManager, ): # pylint: disable=unused-argument """test backup to manager created bucket with policy""" bucket_name = "rwmbackup-test1" target_username = "test1" radosuser_admin.storage_create(bucket_name, target_username) Path("testdir").mkdir() Path("testdir/testdata").write_text('dummy', encoding="utf-8") trwm = rwm.RWM({ "rwm_s3_endpoint_url": radosuser_test1.url, "rwm_s3_access_key": radosuser_test1.access_key, "rwm_s3_secret_key": radosuser_test1.secret_key, "rwm_restic_bucket": bucket_name, "rwm_restic_password": "dummydummydummydummy", "rwm_backups": { "dummy": {"filesdirs": ["testdir"]} } }) assert trwm.restic_cmd(["init"]).returncode == 0 assert trwm.backup_cmd("dummy").returncode == 0 assert radosuser_test1.list_objects(bucket_name) assert len(json.loads(trwm.restic_cmd(["snapshots", "--json"]).stdout)) == 1 with pytest.raises(radosuser_test1.s3.meta.client.exceptions.ClientError, match=r"AccessDenied"): assert radosuser_test1.storage_delete(bucket_name) def test_storage_list( tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager, ): # pylint: disable=unused-argument """test managet list storage""" bucket_name = "rwmbackup-test1" target_username = "test1" radosuser_admin.bucket_create("no-acl-dummy") radosuser_admin.storage_create(bucket_name, target_username) assert radosuser_admin.storage_list() def test_storage_drop_versions(tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager): # pylint: disable=unused-argument """test manager storage_drop_versions""" bucket_name = "testbuckx" target_username = "test1" bucket = radosuser_admin.storage_create(bucket_name, target_username) bucket.upload_fileobj(BytesIO(b"dummydata1"), "dummykey") bucket.upload_fileobj(BytesIO(b"dummydata2"), "dummykey") bucket.Object("dummykey").delete() bucket.upload_fileobj(BytesIO(b"dummydata3"), "dummykey") # boto3 resource api object_versions = list(bucket.object_versions.all()) assert len(object_versions) == 4 # boto3 client api object_versions = radosuser_admin.s3.meta.client.list_object_versions(Bucket=bucket.name) assert len(object_versions["Versions"]) == 3 assert len(object_versions["DeleteMarkers"]) == 1 assert radosuser_admin.storage_drop_versions(bucket.name) == 0 object_versions = list(bucket.object_versions.all()) assert len(object_versions) == 1