Skip to content
Snippets Groups Projects
Commit 3e2e6876 authored by Radoslav Bodó's avatar Radoslav Bodó
Browse files

rwm: add policed bucket management

parent 832536b7
Branches
Tags
No related merge requests found
Pipeline #7439 failed
...@@ -22,6 +22,7 @@ lint-yaml: ...@@ -22,6 +22,7 @@ lint-yaml:
yamllint --strict . yamllint --strict .
test: test:
# show stderr with "-o log_cli=true"
python3 -m pytest -v tests/ python3 -m pytest -v tests/
coverage: coverage:
......
...@@ -105,10 +105,29 @@ rwm restic mount /mnt/restore ...@@ -105,10 +105,29 @@ rwm restic mount /mnt/restore
``` ```
### RWM: backups with policed buckets
Have two S3 accounts (*admin* and *user1*), create storage bucket and use it.
```
cp examples/rwm-admin.conf admin.conf
rwm --confg admin.conf create_storage bucket1 user1
rwm --confg admin.conf storage_check_policy bucket1
rwm --confg admin.conf storage_list
cp examples/rwm-backups.conf rwm.conf
rwm storage_check_policy bucket1
rwm backup_all
rwm restic snapshots
rwm restic mount /mnt/restore
```
## Notes ## Notes
* executed tools stdout is buffered, eg. `restic mount` does not print immediate output as normal * executed tools stdout is buffered, eg. `restic mount` does not print immediate output as normal
* passthrough full arguments to underlying tool with "--" (eg. `rwm rclone -- ls --help`). * passthrough full arguments to underlying tool with "--" (eg. `rwm rclone -- ls --help`).
* runner microceph breaks on reboot because of symlink at /etc/ceph
## Development ## Development
...@@ -132,3 +151,11 @@ export RUNNER_URL= ...@@ -132,3 +151,11 @@ export RUNNER_URL=
export RUNNER_TOKEN= export RUNNER_TOKEN=
make runner make runner
``` ```
## References
* https://restic.readthedocs.io/
* https://github.com/CESNET/aws-plugin-bucket-policy
* https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lock.html
* https://aws.amazon.com/blogs/storage/point-in-time-restore-for-amazon-s3-buckets/
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
import base64 import base64
import dataclasses import dataclasses
import json
import logging import logging
import os import os
import shlex import shlex
...@@ -12,6 +13,8 @@ from argparse import ArgumentParser ...@@ -12,6 +13,8 @@ from argparse import ArgumentParser
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
import boto3
import botocore
import yaml import yaml
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
...@@ -99,13 +102,121 @@ class BackupResult: ...@@ -99,13 +102,121 @@ class BackupResult:
} }
class StorageManager:
"""s3 policed bucket manager"""
def __init__(self, url, access_key, secret_key):
self.url = url
self.access_key = access_key
self.secret_key = secret_key
self.s3 = boto3.resource('s3', endpoint_url=url, aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key)
def create_bucket(self, name):
"""aws s3 resource api stub"""
# boto3 client and resource api are not completely aligned
# s3.Bucket("xyz").create() returns dict instead of s3.Bucket object
return self.s3.create_bucket(Bucket=name)
def bucket_exist(self, name):
"""check if bucket exist"""
return name in [x.name for x in self.list_buckets()]
def bucket_owner(self, name):
"""aws s3 resource api stub"""
return self.s3.Bucket(name).Acl().owner["ID"]
def bucket_policy(self, name):
"""aws s3 resource api stub"""
try:
return json.loads(self.s3.Bucket(name).Policy().policy)
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as exc:
logger.error("rwm bucket_policy error, %s", (exc))
return None
def list_buckets(self):
"""aws s3 resource api stub"""
return list(self.s3.buckets.all())
def list_objects(self, bucket_name):
"""aws s3 resource api stub"""
return list(self.s3.Bucket(bucket_name).objects.all())
def storage_create(self, bucket_name, target_username):
"""create policed bucket"""
if (not bucket_name) or (not target_username):
raise ValueError("must specify value for bucket and user")
bucket = self.create_bucket(bucket_name)
tenant, manager_username = bucket.Acl().owner["ID"].split("$")
# grants basic RW access to user in same tenant
bucket_policy = {
"Version": "2012-10-17",
"Statement": [
# full access to manager
{
"Effect": "Allow",
"Principal": {"AWS": [f"arn:aws:iam::{tenant}:user/{manager_username}"]},
"Action": ["*"],
"Resource": [f"arn:aws:s3:::{bucket.name}", f"arn:aws:s3:::{bucket.name}/*"]
},
# limited access to user
{
"Effect": "Allow",
"Principal": {"AWS": [f"arn:aws:iam::{tenant}:user/{target_username}"]},
"Action": [
"s3:ListBucket", "s3:GetObject", "s3:PutObject", "s3:DeleteObject",
"s3:GetBucketPolicy", "s3:ListBucketVersions", "s3:GetBucketVersioning"
],
"Resource": [f"arn:aws:s3:::{bucket.name}", f"arn:aws:s3:::{bucket.name}/*"]
}
]
}
bucket.Policy().put(Policy=json.dumps(bucket_policy))
# enforces versioning
bucket.Versioning().enable()
return bucket
def storage_delete(self, bucket_name):
"""storage delete"""
bucket = self.s3.Bucket(bucket_name)
bucket.objects.all().delete()
bucket.object_versions.all().delete()
bucket.delete()
def storage_check_policy(self, name):
"""storage check bucket policy"""
if not (policy := self.bucket_policy(name)):
return False
if (
len(policy["Statement"]) == 2
and len(list(filter(lambda stmt: stmt["Action"] == ["*"], policy["Statement"]))) == 1
and self.s3.Bucket(name).Versioning().status == "Enabled"
):
return True
return False
class RWM: class RWM:
"""rwm impl""" """rwm impl"""
def __init__(self, config): def __init__(self, config):
self.config = config self.config = config
self.storage_manager = StorageManager(
config.get("rwm_s3_endpoint_url"),
config.get("rwm_s3_access_key"),
config.get("rwm_s3_secret_key")
)
def aws_cmd(self, args): def aws_cmd(self, args) -> subprocess.CompletedProcess:
"""aws cli wrapper""" """aws cli wrapper"""
env = { env = {
...@@ -121,7 +232,7 @@ class RWM: ...@@ -121,7 +232,7 @@ class RWM:
# aws cli does not have endpoint-url as env config option # aws cli does not have endpoint-url as env config option
return run_command(["aws", "--endpoint-url", self.config["rwm_s3_endpoint_url"]] + args, env=env) return run_command(["aws", "--endpoint-url", self.config["rwm_s3_endpoint_url"]] + args, env=env)
def rclone_cmd(self, args): def rclone_cmd(self, args) -> subprocess.CompletedProcess:
"""rclone wrapper""" """rclone wrapper"""
env = { env = {
...@@ -136,7 +247,7 @@ class RWM: ...@@ -136,7 +247,7 @@ class RWM:
} }
return run_command(["rclone"] + args, env=env) return run_command(["rclone"] + args, env=env)
def rclone_crypt_cmd(self, args): def rclone_crypt_cmd(self, args) -> subprocess.CompletedProcess:
""" """
rclone crypt wrapper rclone crypt wrapper
* https://rclone.org/docs/#config-file * https://rclone.org/docs/#config-file
...@@ -160,7 +271,7 @@ class RWM: ...@@ -160,7 +271,7 @@ class RWM:
} }
return run_command(["rclone"] + args, env=env) return run_command(["rclone"] + args, env=env)
def restic_cmd(self, args): def restic_cmd(self, args) -> subprocess.CompletedProcess:
"""restic command wrapper""" """restic command wrapper"""
env = { env = {
...@@ -173,7 +284,7 @@ class RWM: ...@@ -173,7 +284,7 @@ class RWM:
} }
return run_command(["restic"] + args, env=env) return run_command(["restic"] + args, env=env)
def restic_autoinit(self): def restic_autoinit(self) -> subprocess.CompletedProcess:
"""runs restic init""" """runs restic init"""
logger.info("run restic_autoinit") logger.info("run restic_autoinit")
...@@ -181,7 +292,7 @@ class RWM: ...@@ -181,7 +292,7 @@ class RWM:
proc = self.restic_cmd(["init"]) proc = self.restic_cmd(["init"])
return proc return proc
def restic_backup(self, name): def restic_backup(self, name) -> subprocess.CompletedProcess:
"""runs restic backup by name""" """runs restic backup by name"""
logger.info(f"run restic_backup {name}") logger.info(f"run restic_backup {name}")
...@@ -194,7 +305,7 @@ class RWM: ...@@ -194,7 +305,7 @@ class RWM:
return self.restic_cmd(cmd_args) return self.restic_cmd(cmd_args)
def restic_forget_prune(self): def restic_forget_prune(self) -> subprocess.CompletedProcess:
"""runs forget prune""" """runs forget prune"""
logger.info("run restic_forget_prune") logger.info("run restic_forget_prune")
...@@ -205,9 +316,12 @@ class RWM: ...@@ -205,9 +316,12 @@ class RWM:
return self.restic_cmd(cmd_args) return self.restic_cmd(cmd_args)
def backup_cmd(self, name): def backup_cmd(self, name) -> subprocess.CompletedProcess:
"""backup command""" """backup command"""
# TODO: check target backup policy, restic automatically creates
# bucket if ot does not exist with null-policy
autoinit_proc = self.restic_autoinit() autoinit_proc = self.restic_autoinit()
if autoinit_proc.returncode != 0: if autoinit_proc.returncode != 0:
logger.error("restic autoinit failed") logger.error("restic autoinit failed")
...@@ -226,7 +340,7 @@ class RWM: ...@@ -226,7 +340,7 @@ class RWM:
return backup_proc return backup_proc
def backup_all_cmd(self): def backup_all_cmd(self) -> int:
"""backup all command""" """backup all command"""
stats = {} stats = {}
...@@ -259,6 +373,49 @@ class RWM: ...@@ -259,6 +373,49 @@ class RWM:
print(tabulate([item.to_dict() for item in stats.values()], headers="keys", numalign="left")) print(tabulate([item.to_dict() for item in stats.values()], headers="keys", numalign="left"))
return ret return ret
def storage_create_cmd(self, bucket_name, target_username) -> int:
"""storage create command"""
try:
self.storage_manager.storage_create(bucket_name, target_username)
except (
botocore.exceptions.ClientError,
botocore.exceptions.BotoCoreError,
ValueError
) as exc:
logger.error("rwm storage_create error, %s", (exc))
return 1
return 0
def storage_delete_cmd(self, bucket_name) -> int:
"""storage delete command"""
try:
self.storage_manager.storage_delete(bucket_name)
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as exc:
logger.error("rwm storage_delete error, %s", (exc))
return 1
return 0
def storage_check_policy_cmd(self, bucket_name) -> int:
"""storage check policy command"""
ret, msg = (
(0, "OK")
if self.storage_manager.storage_check_policy(bucket_name) == True
else (1, "FAILED")
)
logger.debug("bucket policy: %s", json.dumps(self.storage_manager.bucket_policy(bucket_name), indent=4))
print(msg)
return ret
def storage_list_cmd(self):
pass
def storage_restore(self, bucket_name, target_username):
"""https://gitlab.cesnet.cz/709/public/restic/aws/-/blob/main/bucket_copy.sh?ref_type=heads"""
pass
def configure_logging(debug): def configure_logging(debug):
"""configure logger""" """configure logger"""
...@@ -297,6 +454,14 @@ def parse_arguments(argv): ...@@ -297,6 +454,14 @@ def parse_arguments(argv):
backup_cmd_parser.add_argument("name", help="backup config name") backup_cmd_parser.add_argument("name", help="backup config name")
subparsers.add_parser("backup_all", help="backup all command") subparsers.add_parser("backup_all", help="backup all command")
storage_create_cmd_parser = subparsers.add_parser("storage_create", help="storage_create command")
storage_create_cmd_parser.add_argument("bucket_name", help="bucket name")
storage_create_cmd_parser.add_argument("target_username", help="actual bucket user with limited RW access")
storage_delete_cmd_parser = subparsers.add_parser("storage_delete", help="storage_delete command")
storage_delete_cmd_parser.add_argument("bucket_name", help="bucket name")
storage_check_policy_cmd_parser = subparsers.add_parser("storage_check_policy", help="storage_check_policy command; use --debug to show policy")
storage_check_policy_cmd_parser.add_argument("bucket_name", help="bucket name")
return parser.parse_args(argv) return parser.parse_args(argv)
...@@ -333,7 +498,14 @@ def main(argv=None): ...@@ -333,7 +498,14 @@ def main(argv=None):
if args.command == "backup_all": if args.command == "backup_all":
ret = rwmi.backup_all_cmd() ret = rwmi.backup_all_cmd()
logger.info("rwm backup_all finished with %s (ret %d)", "success" if ret == 0 else "errors", ret) logger.info("rwm backup_all finished with %s (ret %d)", "success" if ret == 0 else "errors", ret)
if args.command == "storage_create":
ret = rwmi.storage_create_cmd(args.bucket_name, args.target_username)
if args.command == "storage_delete":
ret = rwmi.storage_delete_cmd(args.bucket_name)
if args.command == "storage_check_policy":
ret = rwmi.storage_check_policy_cmd(args.bucket_name)
logger.debug("rwm finished with %s (ret %d)", "success" if ret == 0 else "errors", ret)
return ret return ret
......
...@@ -7,10 +7,11 @@ import socket ...@@ -7,10 +7,11 @@ import socket
import subprocess import subprocess
from tempfile import mkdtemp from tempfile import mkdtemp
import boto3
import pytest import pytest
from xprocess import ProcessStarter from xprocess import ProcessStarter
from rwm import StorageManager
@pytest.fixture @pytest.fixture
def tmpworkdir(): def tmpworkdir():
...@@ -54,42 +55,44 @@ def microceph(): ...@@ -54,42 +55,44 @@ def microceph():
yield "http://localhost:80" yield "http://localhost:80"
def rgwuser(microceph_url, name): def radosuser(microceph_url, username, tenant="tenant1"):
"""rgwuser fixture""" """rgwuser fixture"""
subprocess.run( subprocess.run(
["/snap/bin/radosgw-admin", "user", "rm", f"--uid={name}", "--purge-data"], ["/snap/bin/radosgw-admin", "user", "rm", f"--uid={tenant}${username}", "--purge-data"],
stdout=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
check=False check=False
) )
proc = subprocess.run( proc = subprocess.run(
["/snap/bin/radosgw-admin", "user", "create", f"--uid={name}", f"--display-name=rwguser_{name}"], ["/snap/bin/radosgw-admin", "user", "create", f"--uid={tenant}${username}", f"--display-name={tenant}_{username}"],
check=True, check=True,
capture_output=True, capture_output=True,
text=True, text=True,
) )
user = json.loads(proc.stdout) user = json.loads(proc.stdout)
yield boto3.resource( yield StorageManager(microceph_url, user["keys"][0]["access_key"], user["keys"][0]["secret_key"])
's3',
endpoint_url=microceph_url, subprocess.run(["/snap/bin/radosgw-admin", "user", "rm", f"--uid={tenant}${username}", "--purge-data"], check=True)
aws_access_key_id=user["keys"][0]["access_key"],
aws_secret_access_key=user["keys"][0]["secret_key"]
) @pytest.fixture
def radosuser_admin(microceph): # pylint: disable=redefined-outer-name
"""radosuser admin stub"""
subprocess.run(["/snap/bin/radosgw-admin", "user", "rm", f"--uid={name}", "--purge-data"], check=True) yield from radosuser(microceph, "admin")
@pytest.fixture @pytest.fixture
def rgwuser_test1(microceph): # pylint: disable=redefined-outer-name, unused-argument def radosuser_test1(microceph): # pylint: disable=redefined-outer-name
"""rgwuser test1 stub""" """radosuser test1 stub"""
yield from rgwuser(microceph, "test1") yield from radosuser(microceph, "test1")
@pytest.fixture @pytest.fixture
def rgwuser_test2(microceph): # pylint: disable=redefined-outer-name, unused-argument def radosuser_test2(microceph): # pylint: disable=redefined-outer-name
"""rgwuser test2 stub""" """radosuser test2 stub"""
yield from rgwuser(microceph, "test2") yield from radosuser(microceph, "test2")
...@@ -39,7 +39,7 @@ def test_wrap_output(): ...@@ -39,7 +39,7 @@ def test_wrap_output():
def test_main(tmpworkdir: str): # pylint: disable=unused-argument def test_main(tmpworkdir: str): # pylint: disable=unused-argument
"""test main""" """test main"""
# optional and default config hanling # optional and default config handling
assert rwm_main(["version"]) == 0 assert rwm_main(["version"]) == 0
Path("rwm.conf").touch() Path("rwm.conf").touch()
assert rwm_main(["version"]) == 0 assert rwm_main(["version"]) == 0
...@@ -51,9 +51,17 @@ def test_main(tmpworkdir: str): # pylint: disable=unused-argument ...@@ -51,9 +51,17 @@ def test_main(tmpworkdir: str): # pylint: disable=unused-argument
assert rwm_main([item, "dummy"]) == 0 assert rwm_main([item, "dummy"]) == 0
mock = Mock(return_value=0) mock = Mock(return_value=0)
with patch.object(rwm.RWM, "backup_all_cmd", mock): with patch.object(rwm.RWM, "backup_all_cmd", mock):
assert rwm_main(["backup_all"]) == 0 assert rwm_main(["backup_all"]) == 0
with patch.object(rwm.RWM, "storage_create_cmd", mock):
assert rwm_main(["storage_create", "bucket", "user"]) == 0
for item in ["storage_delete", "storage_check_policy"]:
with patch.object(rwm.RWM, f"{item}_cmd", mock):
assert rwm_main([item, "bucket"]) == 0
def test_aws_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument def test_aws_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument
"""test aws command""" """test aws command"""
...@@ -305,3 +313,67 @@ def test_backup_all_cmd_error_handling(tmpworkdir: str): # pylint: disable=unus ...@@ -305,3 +313,67 @@ def test_backup_all_cmd_error_handling(tmpworkdir: str): # pylint: disable=unus
with patch.object(rwm.RWM, "restic_autoinit", mock_fail): with patch.object(rwm.RWM, "restic_autoinit", mock_fail):
assert RWM(rwm_conf).backup_all_cmd() == 11 assert RWM(rwm_conf).backup_all_cmd() == 11
def test_storage_create_cmd(tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager): # pylint: disable=unused-argument
"""test_storage_create_cmd"""
trwm = rwm.RWM({
"rwm_s3_endpoint_url": radosuser_admin.url,
"rwm_s3_access_key": radosuser_admin.access_key,
"rwm_s3_secret_key": radosuser_admin.secret_key,
})
bucket_name = "testbuck"
assert trwm.storage_create_cmd(bucket_name, "testnx") == 0
assert trwm.storage_create_cmd("!invalid", "testnx") == 1
assert trwm.storage_create_cmd("", "testnx") == 1
def test_storage_delete_cmd(tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager): # pylint: disable=unused-argument
"""test_storage_create_cmd"""
trwm = rwm.RWM({
"rwm_s3_endpoint_url": radosuser_admin.url,
"rwm_s3_access_key": radosuser_admin.access_key,
"rwm_s3_secret_key": radosuser_admin.secret_key,
"rwm_restic_bucket": "testbuck",
"rwm_restic_password": "dummydummydummydummy",
"rwm_backups": {
"testcfg": {"filesdirs": ["testdatadir/"]}
}
})
bucket_name = trwm.config["rwm_restic_bucket"]
Path("testdatadir").mkdir()
Path("testdatadir/testdata1.txt").write_text("dummydata", encoding="utf-8")
bucket = trwm.storage_manager.storage_create(bucket_name, "admin")
assert trwm.storage_manager.bucket_exist(bucket_name)
assert len(trwm.storage_manager.list_objects(bucket_name)) == 0
assert trwm.backup_cmd("testcfg").returncode == 0
assert len(trwm.storage_manager.list_objects(bucket_name)) != 0
object_versions = radosuser_admin.s3.meta.client.list_object_versions(Bucket=bucket.name)
assert len(object_versions["Versions"]) > 0
assert len(object_versions["DeleteMarkers"]) > 0
assert trwm.storage_delete_cmd(bucket_name) == 0
assert not trwm.storage_manager.bucket_exist(bucket_name)
assert trwm.storage_delete_cmd(bucket_name) == 1
def test_storage_check_policy_cmd(tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager): # pylint: disable=unused-argument
"""test storage check policy command"""
trwm = rwm.RWM({
"rwm_s3_endpoint_url": radosuser_admin.url,
"rwm_s3_access_key": radosuser_admin.access_key,
"rwm_s3_secret_key": radosuser_admin.secret_key,
})
mock = Mock(return_value=False)
with patch.object(rwm.StorageManager, "storage_check_policy", mock):
assert trwm.storage_check_policy_cmd("dummy") == 1
"""rwm bucket policies tests""" """rwm bucket policies tests"""
import boto3 import json
from io import BytesIO
from pathlib import Path
import pytest import pytest
import rwm
def test_microceph_defaults( def test_microceph_defaults(
tmpworkdir: str, tmpworkdir: str,
microceph: str, microceph: str,
rgwuser_test1: boto3.resource, radosuser_test1: rwm.StorageManager,
rgwuser_test2: boto3.resource radosuser_test2: rwm.StorageManager
): # pylint: disable=unused-argument ): # pylint: disable=unused-argument
"""test microceph defaults""" """test microceph defaults"""
# bucket should not be present bucket_name = "testbuckx"
test_bucket = "testbuckx"
assert test_bucket not in [x.name for x in rgwuser_test1.buckets.all()] # create bucket, check owner and default policy
assert bucket_name not in [x.name for x in radosuser_test1.list_buckets()]
# create bucket radosuser_test1.create_bucket(bucket_name)
rgwuser_test1.create_bucket(Bucket=test_bucket)
assert test_bucket in [x.name for x in rgwuser_test1.buckets.all()] assert bucket_name in [x.name for x in radosuser_test1.list_buckets()]
assert radosuser_test1.bucket_owner(bucket_name).endswith("$test1")
# list from other identity, check it is not visible assert not radosuser_test1.bucket_policy(bucket_name)
assert test_bucket not in [x.name for x in rgwuser_test2.buckets.all()]
# but already exist # bucket must exist, but not be not visible nor accessible to others
with pytest.raises(rgwuser_test2.meta.client.exceptions.BucketAlreadyExists): with pytest.raises(radosuser_test2.s3.meta.client.exceptions.BucketAlreadyExists):
rgwuser_test2.create_bucket(Bucket=test_bucket) radosuser_test2.create_bucket(bucket_name)
assert bucket_name not in [x.name for x in radosuser_test2.list_buckets()]
# belongs to expected user with pytest.raises(radosuser_test2.s3.meta.client.exceptions.ClientError, match=r"AccessDenied"):
assert rgwuser_test1.Bucket(test_bucket).Acl().owner["ID"] == "test1" assert radosuser_test2.list_objects(bucket_name)
# but unaccessible by other user
with pytest.raises(rgwuser_test2.meta.client.exceptions.ClientError, match=r"AccessDenied"):
assert rgwuser_test2.Bucket(test_bucket).Acl().owner["ID"] == "test1" def test_storage_policy(
tmpworkdir: str,
microceph: str,
radosuser_admin: rwm.StorageManager,
radosuser_test1: rwm.StorageManager,
radosuser_test2: rwm.StorageManager
): # pylint: disable=unused-argument
"""test manager created bucket policy"""
bucket = radosuser_admin.storage_create("testbuckx", "test1")
assert radosuser_admin.list_objects(bucket.name) == []
assert radosuser_test1.list_objects(bucket.name) == []
assert radosuser_admin.bucket_policy(bucket.name)
assert radosuser_test1.bucket_policy(bucket.name)
with pytest.raises(radosuser_test2.s3.meta.client.exceptions.ClientError, match=r"AccessDenied"):
radosuser_test2.list_objects(bucket.name)
assert bucket.Versioning().status == "Enabled"
def test_storage_versioning(
tmpworkdir: str,
microceph: str,
radosuser_admin: rwm.StorageManager,
radosuser_test1: rwm.StorageManager,
): # pylint: disable=unused-argument
"""test manager created bucket policy"""
bucket_name = "testbuckx"
target_username = "test1"
bucket = radosuser_admin.storage_create(bucket_name, target_username)
assert bucket.Versioning().status == "Enabled"
bucket = radosuser_test1.s3.Bucket(bucket_name)
bucket.upload_fileobj(BytesIO(b"dummydata"), "dummykey")
assert len(radosuser_test1.list_objects(bucket_name)) == 1
bucket.Object("dummykey").delete()
assert len(radosuser_test1.list_objects(bucket_name)) == 0
# there should be object and it's delete marker
object_versions = list(bucket.object_versions.all())
assert len(object_versions) == 2
# boto3 resource api does not have working marker attribute
# https://github.com/boto/botocore/issues/674
# https://github.com/boto/boto3/issues/1769
# print(radosuser_test1.s3.meta.client.list_object_versions(Bucket=bucket_name))
object_versions = radosuser_test1.s3.meta.client.list_object_versions(Bucket=bucket.name)
assert len(object_versions["Versions"]) == 1
assert len(object_versions["DeleteMarkers"]) == 1
def test_storage_backup(
tmpworkdir: str,
microceph: str,
radosuser_admin: rwm.StorageManager,
radosuser_test1: rwm.StorageManager,
): # pylint: disable=unused-argument
"""test backup to manager created bucket with policy"""
bucket_name = "rwmbackup-test1"
target_username = "test1"
radosuser_admin.storage_create(bucket_name, target_username)
Path("testdir").mkdir()
Path("testdir/testdata").write_text('dummy', encoding="utf-8")
trwm = rwm.RWM({
"rwm_s3_endpoint_url": radosuser_test1.url,
"rwm_s3_access_key": radosuser_test1.access_key,
"rwm_s3_secret_key": radosuser_test1.secret_key,
"rwm_restic_bucket": bucket_name,
"rwm_restic_password": "dummydummydummydummy",
"rwm_backups": {
"dummy": {"filesdirs": ["testdir"]}
}
})
assert trwm.backup_cmd("dummy").returncode == 0
assert radosuser_test1.list_objects(bucket_name)
assert len(json.loads(trwm.restic_cmd(["snapshots", "--json"]).stdout)) == 1
def test_storage_check_policy(
tmpworkdir: str,
microceph: str,
radosuser_admin: rwm.StorageManager,
radosuser_test1: rwm.StorageManager
): # pylint: disable=unused-argument
"""test backup to manager created bucket with policy"""
bucket_name = "rwmbackup-test1"
target_username = "test1"
assert radosuser_admin.create_bucket(bucket_name)
assert not radosuser_admin.storage_check_policy(bucket_name)
radosuser_admin.storage_delete(bucket_name)
radosuser_admin.storage_create(bucket_name, "test1")
assert radosuser_test1.storage_check_policy(bucket_name)
radosuser_admin.s3.Bucket(bucket_name).Versioning().suspend()
assert not radosuser_test1.storage_check_policy(bucket_name)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment