diff --git a/README.md b/README.md index 798e66ec2d2fbd4dbd250f37e1114197acb4215a..b3f4433f609a6509b8db3bf372a9012776c73f5e 100644 --- a/README.md +++ b/README.md @@ -99,6 +99,8 @@ backups follows standard restic procedures, but adds profile like configuration ``` cp examples/rwm-backups.conf rwm.conf +rwm restic init # should create bucket on it's own + rwm backup_all rwm restic snapshots rwm restic mount /mnt/restore @@ -116,7 +118,9 @@ rwm --confg admin.conf storage_check_policy bucket1 rwm --confg admin.conf storage_list cp examples/rwm-backups.conf rwm.conf +rwm restic init rwm storage_check_policy bucket1 + rwm backup_all rwm restic snapshots rwm restic mount /mnt/restore diff --git a/rwm.py b/rwm.py index 6bc6e010301b4b2a33cc959457536122346e46ed..e4cd4ee861ac08ab9103fc8a37cb27d27234255d 100755 --- a/rwm.py +++ b/rwm.py @@ -111,7 +111,7 @@ class StorageManager: self.secret_key = secret_key self.s3 = boto3.resource('s3', endpoint_url=url, aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key) - def create_bucket(self, name): + def bucket_create(self, name): """aws s3 resource api stub""" # boto3 client and resource api are not completely aligned # s3.Bucket("xyz").create() returns dict instead of s3.Bucket object @@ -148,7 +148,7 @@ class StorageManager: if (not bucket_name) or (not target_username): raise ValueError("must specify value for bucket and user") - bucket = self.create_bucket(bucket_name) + bucket = self.bucket_create(bucket_name) tenant, manager_username = bucket.Acl().owner["ID"].split("$") # grants basic RW access to user in same tenant @@ -284,15 +284,7 @@ class RWM: } return run_command(["restic"] + args, env=env) - def restic_autoinit(self) -> subprocess.CompletedProcess: - """runs restic init""" - - logger.info("run restic_autoinit") - if (proc := self.restic_cmd(["cat", "config"])).returncode != 0: - proc = self.restic_cmd(["init"]) - return proc - - def restic_backup(self, name) -> subprocess.CompletedProcess: + def _restic_backup(self, name) -> subprocess.CompletedProcess: """runs restic backup by name""" logger.info(f"run restic_backup {name}") @@ -305,7 +297,7 @@ class RWM: return self.restic_cmd(cmd_args) - def restic_forget_prune(self) -> subprocess.CompletedProcess: + def _restic_forget_prune(self) -> subprocess.CompletedProcess: """runs forget prune""" logger.info("run restic_forget_prune") @@ -322,20 +314,14 @@ class RWM: # TODO: check target backup policy, restic automatically creates # bucket if ot does not exist with null-policy - autoinit_proc = self.restic_autoinit() - if autoinit_proc.returncode != 0: - logger.error("restic autoinit failed") - wrap_output(autoinit_proc) - return autoinit_proc - - wrap_output(backup_proc := self.restic_backup(name)) + wrap_output(backup_proc := self._restic_backup(name)) if backup_proc.returncode != 0: - logger.error("restic_backup failed") + logger.error("rwm _restic_backup failed") return backup_proc - wrap_output(forget_proc := self.restic_forget_prune()) + wrap_output(forget_proc := self._restic_forget_prune()) if forget_proc.returncode != 0: - logger.error("restic_forget_prune failed") + logger.error("rwm _restic_forget_prune failed") return forget_proc return backup_proc @@ -346,25 +332,16 @@ class RWM: stats = {} ret = 0 - time_start = datetime.now() - autoinit_proc = self.restic_autoinit() - time_end = datetime.now() - if autoinit_proc.returncode != 0: - logger.error("restic autoinit failed") - wrap_output(autoinit_proc) - return autoinit_proc.returncode - stats["_autoinit"] = BackupResult("_autoinit", autoinit_proc.returncode, time_start, time_end) - for name in self.config["rwm_backups"].keys(): time_start = datetime.now() - wrap_output(backup_proc := self.restic_backup(name)) + wrap_output(backup_proc := self._restic_backup(name)) time_end = datetime.now() ret |= backup_proc.returncode stats[name] = BackupResult(name, backup_proc.returncode, time_start, time_end) if ret == 0: time_start = datetime.now() - wrap_output(forget_proc := self.restic_forget_prune()) + wrap_output(forget_proc := self._restic_forget_prune()) time_end = datetime.now() ret |= forget_proc.returncode stats["_forget_prune"] = BackupResult("_forget_prune", forget_proc.returncode, time_start, time_end) @@ -400,11 +377,7 @@ class RWM: def storage_check_policy_cmd(self, bucket_name) -> int: """storage check policy command""" - ret, msg = ( - (0, "OK") - if self.storage_manager.storage_check_policy(bucket_name) == True - else (1, "FAILED") - ) + ret, msg = (0, "OK") if self.storage_manager.storage_check_policy(bucket_name) else (1, "FAILED") logger.debug("bucket policy: %s", json.dumps(self.storage_manager.bucket_policy(bucket_name), indent=4)) print(msg) return ret @@ -498,6 +471,7 @@ def main(argv=None): if args.command == "backup_all": ret = rwmi.backup_all_cmd() logger.info("rwm backup_all finished with %s (ret %d)", "success" if ret == 0 else "errors", ret) + if args.command == "storage_create": ret = rwmi.storage_create_cmd(args.bucket_name, args.target_username) if args.command == "storage_delete": diff --git a/tests/test_default.py b/tests/test_default.py index f04dd224d1298dafa1f71e18afa7ea9bceda453e..8c3e4531c953a2ad95acc3184355abf455cb0153 100644 --- a/tests/test_default.py +++ b/tests/test_default.py @@ -1,25 +1,11 @@ """default tests""" -import json from pathlib import Path from subprocess import CompletedProcess from unittest.mock import Mock, patch -import boto3 import rwm -from rwm import is_sublist, main as rwm_main, rclone_obscure_password, RWM, wrap_output - - -def buckets_plain_list(full_response): - """boto3 helper""" - - return [x["Name"] for x in full_response["Buckets"]] - - -def objects_plain_list(full_response): - """boto3 helper""" - - return [x["Key"] for x in full_response["Contents"]] +from rwm import is_sublist, main as rwm_main, wrap_output def test_sublist(): @@ -45,335 +31,26 @@ def test_main(tmpworkdir: str): # pylint: disable=unused-argument assert rwm_main(["version"]) == 0 # command branches - mock = Mock(return_value=CompletedProcess(args='dummy', returncode=0)) - for item in ["aws", "rclone", "rclone_crypt", "restic", "backup"]: - with patch.object(rwm.RWM, f"{item}_cmd", mock): - assert rwm_main([item, "dummy"]) == 0 - - mock = Mock(return_value=0) - - with patch.object(rwm.RWM, "backup_all_cmd", mock): + mock_proc = Mock(return_value=CompletedProcess(args='dummy', returncode=0)) + mock_ok = Mock(return_value=0) + + with patch.object(rwm.RWM, f"aws_cmd", mock_proc): + assert rwm_main(["aws", "dummy"]) == 0 + with patch.object(rwm.RWM, f"rclone_cmd", mock_proc): + assert rwm_main(["rclone", "dummy"]) == 0 + with patch.object(rwm.RWM, f"rclone_crypt_cmd", mock_proc): + assert rwm_main(["rclone_crypt", "dummy"]) == 0 + with patch.object(rwm.RWM, f"restic_cmd", mock_proc): + assert rwm_main(["restic", "dummy"]) == 0 + + with patch.object(rwm.RWM, f"backup_cmd", mock_proc): + assert rwm_main(["backup", "dummy"]) == 0 + with patch.object(rwm.RWM, "backup_all_cmd", mock_ok): assert rwm_main(["backup_all"]) == 0 - with patch.object(rwm.RWM, "storage_create_cmd", mock): + with patch.object(rwm.RWM, "storage_create_cmd", mock_ok): assert rwm_main(["storage_create", "bucket", "user"]) == 0 - - for item in ["storage_delete", "storage_check_policy"]: - with patch.object(rwm.RWM, f"{item}_cmd", mock): - assert rwm_main([item, "bucket"]) == 0 - - -def test_aws_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument - """test aws command""" - - trwm = RWM({ - "rwm_s3_endpoint_url": motoserver, - "rwm_s3_access_key": "dummy", - "rwm_s3_secret_key": "dummy", - }) - s3 = boto3.client('s3', endpoint_url=motoserver, aws_access_key_id="dummy", aws_secret_access_key="dummy") - test_bucket = "testbucket" - - assert test_bucket not in buckets_plain_list(s3.list_buckets()) - - trwm.aws_cmd(["s3", "mb", f"s3://{test_bucket}"]) - assert test_bucket in buckets_plain_list(s3.list_buckets()) - - trwm.aws_cmd(["s3", "rb", f"s3://{test_bucket}"]) - assert test_bucket not in buckets_plain_list(s3.list_buckets()) - - -def test_rclone_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument - """test rclone command""" - - trwm = RWM({ - "rwm_s3_endpoint_url": motoserver, - "rwm_s3_access_key": "dummy", - "rwm_s3_secret_key": "dummy", - }) - s3 = boto3.client('s3', endpoint_url=motoserver, aws_access_key_id="dummy", aws_secret_access_key="dummy") - - test_bucket = "testbucket" - test_file = "testfile.txt" - Path(test_file).write_text('1234', encoding='utf-8') - - trwm.rclone_cmd(["mkdir", f"rwmbe:/{test_bucket}/"]) - trwm.rclone_cmd(["copy", test_file, f"rwmbe:/{test_bucket}/"]) - assert test_bucket in buckets_plain_list(s3.list_buckets()) - assert test_file in objects_plain_list(s3.list_objects_v2(Bucket=test_bucket)) - - -def test_rclone_crypt_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument - """test rclone with crypt overlay""" - - trwm = RWM({ - "rwm_s3_endpoint_url": motoserver, - "rwm_s3_access_key": "dummy", - "rwm_s3_secret_key": "dummy", - "rwm_rclone_crypt_bucket": "cryptdata_test", - "rwm_rclone_crypt_password": rclone_obscure_password("dummydummydummydummy"), - }) - s3 = boto3.client('s3', endpoint_url=motoserver, aws_access_key_id="dummy", aws_secret_access_key="dummy") - - test_bucket = "testbucket" - test_file = "testfile.txt" - Path(test_file).write_text('1234', encoding='utf-8') - - trwm.rclone_crypt_cmd(["copy", test_file, f"rwmbe:/{test_bucket}/"]) - assert len(objects_plain_list(s3.list_objects_v2(Bucket=trwm.config["rwm_rclone_crypt_bucket"]))) == 1 - - trwm.rclone_crypt_cmd(["delete", f"rwmbe:/{test_bucket}/{test_file}"]) - assert s3.list_objects_v2(Bucket=trwm.config["rwm_rclone_crypt_bucket"])["KeyCount"] == 0 - - test_file1 = "testfile1.txt" - Path(test_file1).write_text('4321', encoding='utf-8') - trwm.rclone_crypt_cmd(["sync", ".", f"rwmbe:/{test_bucket}/"]) - assert s3.list_objects_v2(Bucket=trwm.config["rwm_rclone_crypt_bucket"])["KeyCount"] == 2 - - Path(test_file1).unlink() - trwm.rclone_crypt_cmd(["sync", ".", f"rwmbe:/{test_bucket}/"]) - assert s3.list_objects_v2(Bucket=trwm.config["rwm_rclone_crypt_bucket"])["KeyCount"] == 1 - - -def test_restic_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument - """test restic command""" - - trwm = RWM({ - "rwm_s3_endpoint_url": motoserver, - "rwm_s3_access_key": "dummy", - "rwm_s3_secret_key": "dummy", - "rwm_restic_bucket": "restictest", - "rwm_restic_password": "dummydummydummydummy", - }) - - assert trwm.restic_cmd(["init"]).returncode == 0 - proc = trwm.restic_cmd(["cat", "config"]) - assert "id" in json.loads(proc.stdout) - - -def _list_snapshots(trwm): - """test helper""" - - return json.loads(trwm.restic_cmd(["snapshots", "--json"]).stdout) - - -def _list_files(trwm, snapshot_id): - """test helper""" - - snapshot_ls = [ - json.loads(x) - for x in - trwm.restic_cmd(["ls", snapshot_id, "--json"]).stdout.splitlines() - ] - return [ - x["path"] for x in snapshot_ls - if (x["struct_type"] == "node" and x["type"] == "file") - ] - - -def test_backup_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument - """test backup_cmd command""" - - trwm = RWM({ - "rwm_s3_endpoint_url": motoserver, - "rwm_s3_access_key": "dummy", - "rwm_s3_secret_key": "dummy", - "rwm_restic_bucket": "restictest", - "rwm_restic_password": "dummydummydummydummy", - "rwm_backups": { - "testcfg": { - "filesdirs": ["testdatadir/"], - "excludes": ["testfile_to_be_ignored"], - "extras": ["--tag", "dummytag"], - } - }, - "rwm_retention": { - "keep-daily": "1" - } - }) - - Path("testdatadir").mkdir() - Path("testdatadir/testdata1.txt").write_text("dummydata", encoding="utf-8") - Path("testdatadir/testfile_to_be_ignored").write_text("dummydata", encoding="utf-8") - - assert trwm.backup_cmd("testcfg").returncode == 0 - - snapshots = _list_snapshots(trwm) - assert len(snapshots) == 1 - snapshot_files = _list_files(trwm, snapshots[0]["id"]) - assert "/testdatadir/testdata1.txt" in snapshot_files - - -def test_backup_cmd_excludes(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument - """test backup command""" - - trwm = RWM({ - "rwm_s3_endpoint_url": motoserver, - "rwm_s3_access_key": "dummy", - "rwm_s3_secret_key": "dummy", - "rwm_restic_bucket": "restictest", - "rwm_restic_password": "dummydummydummydummy", - "rwm_backups": { - "testcfg": { - "filesdirs": ["testdatadir"], - "excludes": ["proc/*", "*.ignored"], - "extras": ["--tag", "dummytag"], - } - } - }) - - Path("testdatadir").mkdir() - Path("testdatadir/etc").mkdir() - Path("testdatadir/etc/config").write_text("dummydata", encoding="utf-8") - Path("testdatadir/etc/config2").write_text("dummydata", encoding="utf-8") - Path("testdatadir/etc/config3.ignored").write_text("dummydata", encoding="utf-8") - Path("testdatadir/etc/proc").write_text("dummydata", encoding="utf-8") - Path("testdatadir/etc/processor").write_text("dummydata", encoding="utf-8") - Path("testdatadir/proc").mkdir() - Path("testdatadir/proc/to_be_also_excluded").write_text("dummydata", encoding="utf-8") - Path("testdatadir/processor").write_text("dummydata", encoding="utf-8") - Path("testdatadir/some_other_proc_essor").write_text("dummydata", encoding="utf-8") - Path("testdatadir/var").mkdir() - Path("testdatadir/var/proc").mkdir() - Path("testdatadir/var/proc/data").write_text("dummydata", encoding="utf-8") - - assert trwm.backup_cmd("testcfg").returncode == 0 - - snapshots = _list_snapshots(trwm) - assert len(snapshots) == 1 - snapshot_files = _list_files(trwm, snapshots[0]["id"]) - assert "/testdatadir/etc/config" in snapshot_files - assert "/testdatadir/etc/config2" in snapshot_files - assert "/testdatadir/etc/config3.ignored" not in snapshot_files - assert "/testdatadir/etc/proc" in snapshot_files - assert "/testdatadir/etc/processor" in snapshot_files - assert "/testdatadir/proc" not in snapshot_files - assert "/testdatadir/proc/to_be_also_excluded" not in snapshot_files - assert "/testdatadir/processor" in snapshot_files - assert "/testdatadir/some_other_proc_essor" in snapshot_files - # following expected result does not work, because test config uses root-unanchored exclude path "proc/*" - # assert "/testdatadir/var/proc/data" in snapshot_files - - -def test_backup_cmd_error_handling(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument - """test backup command err cases""" - - rwm_conf = { - "rwm_backups": { - "dummycfg": {"filesdirs": ["dummydir"]} - } - } - mock_ok = Mock(return_value=CompletedProcess(args='dummy', returncode=0)) - mock_fail = Mock(return_value=CompletedProcess(args='dummy', returncode=11)) - - with patch.object(rwm.RWM, "restic_autoinit", mock_fail): - assert RWM(rwm_conf).backup_cmd("dummycfg").returncode == 11 - - with ( - patch.object(rwm.RWM, "restic_autoinit", mock_ok), - patch.object(rwm.RWM, "restic_backup", mock_fail) - ): - assert RWM(rwm_conf).backup_cmd("dummycfg").returncode == 11 - - with ( - patch.object(rwm.RWM, "restic_autoinit", mock_ok), - patch.object(rwm.RWM, "restic_backup", mock_ok), - patch.object(rwm.RWM, "restic_forget_prune", mock_fail) - ): - assert RWM(rwm_conf).backup_cmd("dummycfg").returncode == 11 - - -def test_backup_all_cmd(tmpworkdir: str): # pylint: disable=unused-argument - """test backup command err cases""" - - rwm_conf = { - "rwm_backups": { - "dummycfg": {"filesdirs": ["dummydir"]} - } - } - mock = Mock(return_value=CompletedProcess(args='dummy', returncode=0)) - - with ( - patch.object(rwm.RWM, "restic_autoinit", mock), - patch.object(rwm.RWM, "restic_backup", mock), - patch.object(rwm.RWM, "restic_forget_prune", mock) - ): - assert RWM(rwm_conf).backup_all_cmd() == 0 - - -def test_backup_all_cmd_error_handling(tmpworkdir: str): # pylint: disable=unused-argument - """test backup command err cases""" - - rwm_conf = { - "rwm_backups": { - "dummycfg": {"filesdirs": ["dummydir"]} - } - } - mock_fail = Mock(return_value=CompletedProcess(args='dummy', returncode=11)) - - with patch.object(rwm.RWM, "restic_autoinit", mock_fail): - assert RWM(rwm_conf).backup_all_cmd() == 11 - - -def test_storage_create_cmd(tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager): # pylint: disable=unused-argument - """test_storage_create_cmd""" - - trwm = rwm.RWM({ - "rwm_s3_endpoint_url": radosuser_admin.url, - "rwm_s3_access_key": radosuser_admin.access_key, - "rwm_s3_secret_key": radosuser_admin.secret_key, - }) - - bucket_name = "testbuck" - assert trwm.storage_create_cmd(bucket_name, "testnx") == 0 - assert trwm.storage_create_cmd("!invalid", "testnx") == 1 - assert trwm.storage_create_cmd("", "testnx") == 1 - - -def test_storage_delete_cmd(tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager): # pylint: disable=unused-argument - """test_storage_create_cmd""" - - trwm = rwm.RWM({ - "rwm_s3_endpoint_url": radosuser_admin.url, - "rwm_s3_access_key": radosuser_admin.access_key, - "rwm_s3_secret_key": radosuser_admin.secret_key, - - "rwm_restic_bucket": "testbuck", - "rwm_restic_password": "dummydummydummydummy", - "rwm_backups": { - "testcfg": {"filesdirs": ["testdatadir/"]} - } - }) - - bucket_name = trwm.config["rwm_restic_bucket"] - Path("testdatadir").mkdir() - Path("testdatadir/testdata1.txt").write_text("dummydata", encoding="utf-8") - - bucket = trwm.storage_manager.storage_create(bucket_name, "admin") - assert trwm.storage_manager.bucket_exist(bucket_name) - assert len(trwm.storage_manager.list_objects(bucket_name)) == 0 - - assert trwm.backup_cmd("testcfg").returncode == 0 - assert len(trwm.storage_manager.list_objects(bucket_name)) != 0 - - object_versions = radosuser_admin.s3.meta.client.list_object_versions(Bucket=bucket.name) - assert len(object_versions["Versions"]) > 0 - assert len(object_versions["DeleteMarkers"]) > 0 - - assert trwm.storage_delete_cmd(bucket_name) == 0 - assert not trwm.storage_manager.bucket_exist(bucket_name) - assert trwm.storage_delete_cmd(bucket_name) == 1 - - -def test_storage_check_policy_cmd(tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager): # pylint: disable=unused-argument - """test storage check policy command""" - - trwm = rwm.RWM({ - "rwm_s3_endpoint_url": radosuser_admin.url, - "rwm_s3_access_key": radosuser_admin.access_key, - "rwm_s3_secret_key": radosuser_admin.secret_key, - }) - - mock = Mock(return_value=False) - with patch.object(rwm.StorageManager, "storage_check_policy", mock): - assert trwm.storage_check_policy_cmd("dummy") == 1 + with patch.object(rwm.RWM, "storage_delete_cmd", mock_ok): + assert rwm_main(["storage_delete", "bucket"]) == 0 + with patch.object(rwm.RWM, "storage_check_policy_cmd", mock_ok): + assert rwm_main(["storage_check_policy", "bucket"]) == 0 diff --git a/tests/test_rwm.py b/tests/test_rwm.py new file mode 100644 index 0000000000000000000000000000000000000000..b45cf9e3921987ae570d9e41876c5916b21d8f30 --- /dev/null +++ b/tests/test_rwm.py @@ -0,0 +1,287 @@ +"""rwm tests""" + +import json +from pathlib import Path +from subprocess import CompletedProcess +from unittest.mock import Mock, patch + +import rwm + + +def test_aws_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument + """test aws command""" + + trwm = rwm.RWM({ + "rwm_s3_endpoint_url": motoserver, + "rwm_s3_access_key": "dummy", + "rwm_s3_secret_key": "dummy", + }) + test_bucket = "testbucket" + + assert not trwm.storage_manager.bucket_exist(test_bucket) + + trwm.aws_cmd(["s3", "mb", f"s3://{test_bucket}"]) + assert trwm.storage_manager.bucket_exist(test_bucket) + + trwm.aws_cmd(["s3", "rb", f"s3://{test_bucket}"]) + assert not trwm.storage_manager.bucket_exist(test_bucket) + + +def test_rclone_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument + """test rclone command""" + + trwm = rwm.RWM({ + "rwm_s3_endpoint_url": motoserver, + "rwm_s3_access_key": "dummy", + "rwm_s3_secret_key": "dummy", + }) + + test_bucket = "testbucket" + test_file = "testfile.txt" + Path(test_file).write_text('1234', encoding='utf-8') + + trwm.rclone_cmd(["mkdir", f"rwmbe:/{test_bucket}/"]) + assert trwm.storage_manager.bucket_exist(test_bucket) + + trwm.rclone_cmd(["copy", test_file, f"rwmbe:/{test_bucket}/"]) + assert len(trwm.storage_manager.list_objects(test_bucket)) == 1 + + +def test_rclone_crypt_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument + """test rclone with crypt overlay""" + + trwm = rwm.RWM({ + "rwm_s3_endpoint_url": motoserver, + "rwm_s3_access_key": "dummy", + "rwm_s3_secret_key": "dummy", + "rwm_rclone_crypt_bucket": "cryptdata_test", + "rwm_rclone_crypt_password": rwm.rclone_obscure_password("dummydummydummydummy"), + }) + + test_bucket = "testbucket" + test_file = "testfile.txt" + Path(test_file).write_text('1234', encoding='utf-8') + + trwm.rclone_crypt_cmd(["copy", test_file, f"rwmbe:/{test_bucket}/"]) + assert len(trwm.storage_manager.list_objects(trwm.config["rwm_rclone_crypt_bucket"])) == 1 + + trwm.rclone_crypt_cmd(["delete", f"rwmbe:/{test_bucket}/{test_file}"]) + assert len(trwm.storage_manager.list_objects(trwm.config["rwm_rclone_crypt_bucket"])) == 0 + + test_file1 = "testfile1.txt" + Path(test_file1).write_text('4321', encoding='utf-8') + trwm.rclone_crypt_cmd(["sync", ".", f"rwmbe:/{test_bucket}/"]) + assert len(trwm.storage_manager.list_objects(trwm.config["rwm_rclone_crypt_bucket"])) == 2 + + Path(test_file1).unlink() + trwm.rclone_crypt_cmd(["sync", ".", f"rwmbe:/{test_bucket}/"]) + assert len(trwm.storage_manager.list_objects(trwm.config["rwm_rclone_crypt_bucket"])) == 1 + + +def test_restic_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument + """test restic command""" + + trwm = rwm.RWM({ + "rwm_s3_endpoint_url": motoserver, + "rwm_s3_access_key": "dummy", + "rwm_s3_secret_key": "dummy", + "rwm_restic_bucket": "restictest", + "rwm_restic_password": "dummydummydummydummy", + }) + + assert trwm.restic_cmd(["init"]).returncode == 0 + proc = trwm.restic_cmd(["cat", "config"]) + assert "id" in json.loads(proc.stdout) + + +def _restic_list_snapshots(trwm): + """test helper""" + return json.loads(trwm.restic_cmd(["snapshots", "--json"]).stdout) + + +def _restic_list_snapshot_files(trwm, snapshot_id): + """test helper""" + snapshot_ls = [json.loads(x) for x in trwm.restic_cmd(["ls", snapshot_id, "--json"]).stdout.splitlines()] + return [x["path"] for x in snapshot_ls if (x["struct_type"] == "node") and (x["type"] == "file")] + + +def test_backup_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument + """test backup_cmd command""" + + trwm = rwm.RWM({ + "rwm_s3_endpoint_url": motoserver, + "rwm_s3_access_key": "dummy", + "rwm_s3_secret_key": "dummy", + "rwm_restic_bucket": "restictest", + "rwm_restic_password": "dummydummydummydummy", + "rwm_backups": { + "testcfg": { + "filesdirs": ["testdatadir/"], + "excludes": ["testfile_to_be_ignored"], + "extras": ["--tag", "dummytag"], + } + }, + "rwm_retention": { + "keep-daily": "1" + } + }) + + Path("testdatadir").mkdir() + Path("testdatadir/testdata1.txt").write_text("dummydata", encoding="utf-8") + Path("testdatadir/testfile_to_be_ignored").write_text("dummydata", encoding="utf-8") + + assert trwm.restic_cmd(["init"]).returncode == 0 + assert trwm.backup_cmd("testcfg").returncode == 0 + + snapshots = _restic_list_snapshots(trwm) + assert len(snapshots) == 1 + snapshot_files = _restic_list_snapshot_files(trwm, snapshots[0]["id"]) + assert "/testdatadir/testdata1.txt" in snapshot_files + + +def test_backup_cmd_excludes(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument + """test backup command""" + + import os + trwm = rwm.RWM({ + "rwm_s3_endpoint_url": motoserver, + "rwm_s3_access_key": "dummy", + "rwm_s3_secret_key": "dummy", + "rwm_restic_bucket": "restictest", + "rwm_restic_password": "dummydummydummydummy", + "rwm_backups": { + "testcfg": { + "filesdirs": ["testdatadir/"], + "excludes": ["testdatadir/proc/*", "*.ignored"], + "extras": ["--tag", "dummytag"], + } + } + }) + + Path("testdatadir").mkdir() + Path("testdatadir/etc").mkdir() + Path("testdatadir/etc/config").write_text("dummydata", encoding="utf-8") + Path("testdatadir/etc/config3.ignored").write_text("dummydata", encoding="utf-8") + Path("testdatadir/etc/proc").write_text("dummydata", encoding="utf-8") + Path("testdatadir/proc").mkdir() + Path("testdatadir/proc/to_be_also_excluded").write_text("dummydata", encoding="utf-8") + Path("testdatadir/var").mkdir() + Path("testdatadir/var/proc").mkdir() + Path("testdatadir/var/proc/data").write_text("dummydata", encoding="utf-8") + + assert trwm.restic_cmd(["init"]).returncode == 0 + assert trwm.backup_cmd("testcfg").returncode == 0 + + snapshots = _restic_list_snapshots(trwm) + assert len(snapshots) == 1 + snapshot_files = _restic_list_snapshot_files(trwm, snapshots[0]["id"]) + assert "/testdatadir/etc/config" in snapshot_files + assert "/testdatadir/etc/config3.ignored" not in snapshot_files + assert "/testdatadir/etc/proc" in snapshot_files + assert "/testdatadir/proc" not in snapshot_files + assert "/testdatadir/proc/to_be_also_excluded" not in snapshot_files + assert "/testdatadir/var/proc/data" in snapshot_files + + +def test_backup_cmd_error_handling(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument + """test backup command err cases""" + + rwm_conf = { + "rwm_backups": { + "dummycfg": {"filesdirs": ["dummydir"]} + } + } + mock_ok = Mock(return_value=CompletedProcess(args='dummy', returncode=0)) + mock_fail = Mock(return_value=CompletedProcess(args='dummy', returncode=11)) + + with ( + patch.object(rwm.RWM, "_restic_backup", mock_fail) + ): + assert rwm.RWM(rwm_conf).backup_cmd("dummycfg").returncode == 11 + + with ( + patch.object(rwm.RWM, "_restic_backup", mock_ok), + patch.object(rwm.RWM, "_restic_forget_prune", mock_fail) + ): + assert rwm.RWM(rwm_conf).backup_cmd("dummycfg").returncode == 11 + + +def test_backup_all_cmd(tmpworkdir: str): # pylint: disable=unused-argument + """test backup command err cases""" + + rwm_conf = { + "rwm_backups": { + "dummycfg": {"filesdirs": ["dummydir"]} + } + } + mock = Mock(return_value=CompletedProcess(args='dummy', returncode=0)) + + with ( + patch.object(rwm.RWM, "_restic_backup", mock), + patch.object(rwm.RWM, "_restic_forget_prune", mock) + ): + assert rwm.RWM(rwm_conf).backup_all_cmd() == 0 + + +def test_storage_create_cmd(tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager): # pylint: disable=unused-argument + """test_storage_create_cmd""" + + trwm = rwm.RWM({ + "rwm_s3_endpoint_url": radosuser_admin.url, + "rwm_s3_access_key": radosuser_admin.access_key, + "rwm_s3_secret_key": radosuser_admin.secret_key, + }) + + bucket_name = "testbuck" + assert trwm.storage_create_cmd(bucket_name, "testnx") == 0 + assert trwm.storage_create_cmd("!invalid", "testnx") == 1 + assert trwm.storage_create_cmd(bucket_name, "") == 1 + + +def test_storage_delete_cmd(tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager): # pylint: disable=unused-argument + """test_storage_create_cmd""" + + trwm = rwm.RWM({ + "rwm_s3_endpoint_url": radosuser_admin.url, + "rwm_s3_access_key": radosuser_admin.access_key, + "rwm_s3_secret_key": radosuser_admin.secret_key, + + "rwm_restic_bucket": "testbuck", + "rwm_restic_password": "dummydummydummydummy", + "rwm_backups": { + "testcfg": {"filesdirs": ["testdatadir/"]} + } + }) + + bucket_name = trwm.config["rwm_restic_bucket"] + Path("testdatadir").mkdir() + Path("testdatadir/testdata1.txt").write_text("dummydata", encoding="utf-8") + + bucket = trwm.storage_manager.storage_create(bucket_name, "admin") + assert len(trwm.storage_manager.list_objects(bucket_name)) == 0 + assert trwm.restic_cmd(["init"]).returncode == 0 + assert trwm.backup_cmd("testcfg").returncode == 0 + assert len(trwm.storage_manager.list_objects(bucket_name)) != 0 + + object_versions = radosuser_admin.s3.meta.client.list_object_versions(Bucket=bucket.name) + assert len(object_versions["Versions"]) > 0 + assert len(object_versions["DeleteMarkers"]) > 0 + + assert trwm.storage_delete_cmd(bucket_name) == 0 + assert not trwm.storage_manager.bucket_exist(bucket_name) + + assert trwm.storage_delete_cmd(bucket_name) == 1 + + +def test_storage_check_policy_cmd(tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager): # pylint: disable=unused-argument + """test storage check policy command""" + + trwm = rwm.RWM({ + "rwm_s3_endpoint_url": radosuser_admin.url, + "rwm_s3_access_key": radosuser_admin.access_key, + "rwm_s3_secret_key": radosuser_admin.secret_key, + }) + + mock = Mock(return_value=False) + with patch.object(rwm.StorageManager, "storage_check_policy", mock): + assert trwm.storage_check_policy_cmd("dummy") == 1 diff --git a/tests/test_policies.py b/tests/test_storage.py similarity index 70% rename from tests/test_policies.py rename to tests/test_storage.py index 9f9af5799b2e5f60b9fe8ce76598d4de7c0a7eba..27602e45f6da4fea3826775f06577bd4c83ae4a8 100644 --- a/tests/test_policies.py +++ b/tests/test_storage.py @@ -1,4 +1,4 @@ -"""rwm bucket policies tests""" +"""rwm storagemanager and bucket policing tests""" import json from io import BytesIO @@ -19,63 +19,63 @@ def test_microceph_defaults( bucket_name = "testbuckx" - # create bucket, check owner and default policy - assert bucket_name not in [x.name for x in radosuser_test1.list_buckets()] - radosuser_test1.create_bucket(bucket_name) + # create bucket + assert not radosuser_test1.bucket_exist(bucket_name) + radosuser_test1.bucket_create(bucket_name) + assert len(radosuser_test1.list_buckets()) == 1 + assert radosuser_test1.list_objects(bucket_name) == [] - assert bucket_name in [x.name for x in radosuser_test1.list_buckets()] + # assert basic raw bucket behavior + assert radosuser_test1.bucket_exist(bucket_name) assert radosuser_test1.bucket_owner(bucket_name).endswith("$test1") assert not radosuser_test1.bucket_policy(bucket_name) - # bucket must exist, but not be not visible nor accessible to others + # bucket must exist, but not be accessible to others with pytest.raises(radosuser_test2.s3.meta.client.exceptions.BucketAlreadyExists): - radosuser_test2.create_bucket(bucket_name) - assert bucket_name not in [x.name for x in radosuser_test2.list_buckets()] + radosuser_test2.bucket_create(bucket_name) with pytest.raises(radosuser_test2.s3.meta.client.exceptions.ClientError, match=r"AccessDenied"): assert radosuser_test2.list_objects(bucket_name) -def test_storage_policy( - tmpworkdir: str, - microceph: str, - radosuser_admin: rwm.StorageManager, - radosuser_test1: rwm.StorageManager, - radosuser_test2: rwm.StorageManager +def test_storage_create( + tmpworkdir: str, + microceph: str, + radosuser_admin: rwm.StorageManager, + radosuser_test1: rwm.StorageManager, + radosuser_test2: rwm.StorageManager ): # pylint: disable=unused-argument - """test manager created bucket policy""" + """test manager storage_create""" bucket = radosuser_admin.storage_create("testbuckx", "test1") - + assert radosuser_admin.list_objects(bucket.name) == [] - assert radosuser_test1.list_objects(bucket.name) == [] - assert radosuser_admin.bucket_policy(bucket.name) - assert radosuser_test1.bucket_policy(bucket.name) + assert radosuser_admin.storage_check_policy(bucket.name) + assert radosuser_test1.storage_check_policy(bucket.name) + + # storage must exist, but not be accessible to others with pytest.raises(radosuser_test2.s3.meta.client.exceptions.ClientError, match=r"AccessDenied"): radosuser_test2.list_objects(bucket.name) - assert bucket.Versioning().status == "Enabled" - -def test_storage_versioning( +def test_storage_delete( tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager, radosuser_test1: rwm.StorageManager, ): # pylint: disable=unused-argument - """test manager created bucket policy""" + """test manager storage_delete""" bucket_name = "testbuckx" target_username = "test1" - bucket = radosuser_admin.storage_create(bucket_name, target_username) - assert bucket.Versioning().status == "Enabled" - bucket = radosuser_test1.s3.Bucket(bucket_name) + # + bucket = radosuser_test1.s3.Bucket(bucket.name) bucket.upload_fileobj(BytesIO(b"dummydata"), "dummykey") - assert len(radosuser_test1.list_objects(bucket_name)) == 1 + assert len(radosuser_test1.list_objects(bucket.name)) == 1 bucket.Object("dummykey").delete() - assert len(radosuser_test1.list_objects(bucket_name)) == 0 + assert len(radosuser_test1.list_objects(bucket.name)) == 0 # there should be object and it's delete marker object_versions = list(bucket.object_versions.all()) @@ -90,7 +90,29 @@ def test_storage_versioning( assert len(object_versions["DeleteMarkers"]) == 1 -def test_storage_backup( +def test_storage_check_policy( + tmpworkdir: str, + microceph: str, + radosuser_admin: rwm.StorageManager, + radosuser_test1: rwm.StorageManager +): # pylint: disable=unused-argument + """test manager storage_check_policy""" + + bucket_name = "rwmbackup-test1" + target_username = "test1" + + assert radosuser_admin.bucket_create(bucket_name) + assert not radosuser_admin.storage_check_policy(bucket_name) + radosuser_admin.storage_delete(bucket_name) + + radosuser_admin.storage_create(bucket_name, target_username) + assert radosuser_test1.storage_check_policy(bucket_name) + + radosuser_admin.s3.Bucket(bucket_name).Versioning().suspend() + assert not radosuser_test1.storage_check_policy(bucket_name) + + +def test_storage_backup_usage( tmpworkdir: str, microceph: str, radosuser_admin: rwm.StorageManager, @@ -115,29 +137,11 @@ def test_storage_backup( "dummy": {"filesdirs": ["testdir"]} } }) + assert trwm.restic_cmd(["init"]).returncode == 0 assert trwm.backup_cmd("dummy").returncode == 0 assert radosuser_test1.list_objects(bucket_name) assert len(json.loads(trwm.restic_cmd(["snapshots", "--json"]).stdout)) == 1 - -def test_storage_check_policy( - tmpworkdir: str, - microceph: str, - radosuser_admin: rwm.StorageManager, - radosuser_test1: rwm.StorageManager -): # pylint: disable=unused-argument - """test backup to manager created bucket with policy""" - - bucket_name = "rwmbackup-test1" - target_username = "test1" - - assert radosuser_admin.create_bucket(bucket_name) - assert not radosuser_admin.storage_check_policy(bucket_name) - radosuser_admin.storage_delete(bucket_name) - - radosuser_admin.storage_create(bucket_name, "test1") - assert radosuser_test1.storage_check_policy(bucket_name) - - radosuser_admin.s3.Bucket(bucket_name).Versioning().suspend() - assert not radosuser_test1.storage_check_policy(bucket_name) + with pytest.raises(radosuser_test1.s3.meta.client.exceptions.ClientError, match=r"AccessDenied"): + assert radosuser_test1.storage_delete(bucket_name)