Skip to content
Snippets Groups Projects
Commit f0ca898b authored by Radoslav Bodó's avatar Radoslav Bodó
Browse files

general: improve commands handling, pass CompletedProcess instead of dummy...

general: improve commands handling, pass CompletedProcess instead of dummy dto, finalize basic restic coverage
parent bf3b99c7
No related branches found
No related tags found
No related merge requests found
...@@ -45,18 +45,17 @@ def run_command(*args, **kwargs): ...@@ -45,18 +45,17 @@ def run_command(*args, **kwargs):
"encoding": "utf-8", "encoding": "utf-8",
}) })
logger.debug("run_command, %s", (args, kwargs)) logger.debug("run_command, %s", (args, kwargs))
proc = subprocess.run(*args, **kwargs, check=False) return subprocess.run(*args, **kwargs, check=False)
return (proc.returncode, proc.stdout, proc.stderr)
def wrap_output(returncode, stdout, stderr): def wrap_output(process):
"""wraps command output and prints results""" """wraps command output and prints results"""
if stdout: if process.stdout:
print(stdout) print(process.stdout)
if stderr: if process.stderr:
print(stderr, file=sys.stderr) print(process.stderr, file=sys.stderr)
return returncode return process.returncode
def rclone_obscure_password(plaintext, iv=None): def rclone_obscure_password(plaintext, iv=None):
...@@ -81,13 +80,7 @@ class RWM: ...@@ -81,13 +80,7 @@ class RWM:
self.config = config self.config = config
def aws_cmd(self, args): def aws_cmd(self, args):
""" """aws cli wrapper"""
aws cli wrapper
:param list args: list passed to subprocess
:return: returncode, stdout, stderr
:rtype: tuple
"""
env = { env = {
"PATH": os.environ["PATH"], "PATH": os.environ["PATH"],
...@@ -103,13 +96,7 @@ class RWM: ...@@ -103,13 +96,7 @@ class RWM:
return run_command(["aws", "--endpoint-url", self.config["RWM_S3_ENDPOINT_URL"]] + args, env=env) return run_command(["aws", "--endpoint-url", self.config["RWM_S3_ENDPOINT_URL"]] + args, env=env)
def rclone_cmd(self, args): def rclone_cmd(self, args):
""" """rclone wrapper"""
rclone wrapper
:param list args: list passed to subprocess
:return: returncode, stdout, stderr
:rtype: tuple
"""
env = { env = {
"RCLONE_CONFIG": "", "RCLONE_CONFIG": "",
...@@ -128,10 +115,6 @@ class RWM: ...@@ -128,10 +115,6 @@ class RWM:
rclone crypt wrapper rclone crypt wrapper
* https://rclone.org/docs/#config-file * https://rclone.org/docs/#config-file
* https://rclone.org/crypt/ * https://rclone.org/crypt/
:param list args: list passed to subprocess
:return: returncode, stdout, stderr
:rtype: tuple
""" """
env = { env = {
...@@ -190,13 +173,13 @@ def main(argv=None): ...@@ -190,13 +173,13 @@ def main(argv=None):
rwm = RWM(config) rwm = RWM(config)
if args.command == "aws": if args.command == "aws":
return wrap_output(*rwm.aws_cmd(args.cmd_args)) return wrap_output(rwm.aws_cmd(args.cmd_args))
if args.command == "rclone": if args.command == "rclone":
return wrap_output(*rwm.rclone_cmd(args.cmd_args)) return wrap_output(rwm.rclone_cmd(args.cmd_args))
if args.command == "rclone_crypt": if args.command == "rclone_crypt":
return wrap_output(*rwm.rclone_crypt_cmd(args.cmd_args)) return wrap_output(rwm.rclone_crypt_cmd(args.cmd_args))
if args.command == "restic": if args.command == "restic":
return wrap_output(*rwm.restic_cmd(args.cmd_args)) return wrap_output(rwm.restic_cmd(args.cmd_args))
return 0 return 0
......
"""default tests""" """default tests"""
import json
from pathlib import Path from pathlib import Path
from textwrap import dedent from subprocess import CompletedProcess
from unittest.mock import Mock, patch from unittest.mock import Mock, patch
import boto3 import boto3
...@@ -32,7 +33,7 @@ def test_sublist(): ...@@ -32,7 +33,7 @@ def test_sublist():
def test_wrap_output(): def test_wrap_output():
"""test wrap_output""" """test wrap_output"""
assert wrap_output(11, "dummy", "dummy") == 11 assert wrap_output(CompletedProcess(args='dummy', returncode=11, stdout="dummy", stderr="dummy")) == 11
def test_main(tmpworkdir: str): # pylint: disable=unused-argument def test_main(tmpworkdir: str): # pylint: disable=unused-argument
...@@ -44,7 +45,7 @@ def test_main(tmpworkdir: str): # pylint: disable=unused-argument ...@@ -44,7 +45,7 @@ def test_main(tmpworkdir: str): # pylint: disable=unused-argument
assert rwm_main([]) == 0 assert rwm_main([]) == 0
# command branches # command branches
mock = Mock(return_value=(0, "", "")) mock = Mock(return_value=CompletedProcess(args='dummy', returncode=0))
for item in ["aws", "rclone", "rclone_crypt", "restic"]: for item in ["aws", "rclone", "rclone_crypt", "restic"]:
with patch.object(rwm.RWM, f"{item}_cmd", mock): with patch.object(rwm.RWM, f"{item}_cmd", mock):
assert rwm_main([item]) == 0 assert rwm_main([item]) == 0
...@@ -53,7 +54,7 @@ def test_main(tmpworkdir: str): # pylint: disable=unused-argument ...@@ -53,7 +54,7 @@ def test_main(tmpworkdir: str): # pylint: disable=unused-argument
def test_aws_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument def test_aws_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument
"""test aws command""" """test aws command"""
rwm = RWM({ trwm = RWM({
"RWM_S3_ENDPOINT_URL": motoserver, "RWM_S3_ENDPOINT_URL": motoserver,
"RWM_S3_ACCESS_KEY": "dummy", "RWM_S3_ACCESS_KEY": "dummy",
"RWM_S3_SECRET_KEY": "dummy", "RWM_S3_SECRET_KEY": "dummy",
...@@ -63,17 +64,17 @@ def test_aws_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-ar ...@@ -63,17 +64,17 @@ def test_aws_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-ar
assert test_bucket not in buckets_plain_list(s3.list_buckets()) assert test_bucket not in buckets_plain_list(s3.list_buckets())
rwm.aws_cmd(["s3", "mb", f"s3://{test_bucket}"]) trwm.aws_cmd(["s3", "mb", f"s3://{test_bucket}"])
assert test_bucket in buckets_plain_list(s3.list_buckets()) assert test_bucket in buckets_plain_list(s3.list_buckets())
rwm.aws_cmd(["s3", "rb", f"s3://{test_bucket}"]) trwm.aws_cmd(["s3", "rb", f"s3://{test_bucket}"])
assert test_bucket not in buckets_plain_list(s3.list_buckets()) assert test_bucket not in buckets_plain_list(s3.list_buckets())
def test_rclone_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument def test_rclone_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument
"""test rclone command""" """test rclone command"""
rwm = RWM({ trwm = RWM({
"RWM_S3_ENDPOINT_URL": motoserver, "RWM_S3_ENDPOINT_URL": motoserver,
"RWM_S3_ACCESS_KEY": "dummy", "RWM_S3_ACCESS_KEY": "dummy",
"RWM_S3_SECRET_KEY": "dummy", "RWM_S3_SECRET_KEY": "dummy",
...@@ -84,8 +85,8 @@ def test_rclone_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused ...@@ -84,8 +85,8 @@ def test_rclone_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused
test_file = "testfile.txt" test_file = "testfile.txt"
Path(test_file).write_text('1234', encoding='utf-8') Path(test_file).write_text('1234', encoding='utf-8')
rwm.rclone_cmd(["mkdir", f"rwmbe:/{test_bucket}/"]) trwm.rclone_cmd(["mkdir", f"rwmbe:/{test_bucket}/"])
rwm.rclone_cmd(["copy", test_file, f"rwmbe:/{test_bucket}/"]) trwm.rclone_cmd(["copy", test_file, f"rwmbe:/{test_bucket}/"])
assert test_bucket in buckets_plain_list(s3.list_buckets()) assert test_bucket in buckets_plain_list(s3.list_buckets())
assert test_file in objects_plain_list(s3.list_objects_v2(Bucket=test_bucket)) assert test_file in objects_plain_list(s3.list_objects_v2(Bucket=test_bucket))
...@@ -93,7 +94,7 @@ def test_rclone_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused ...@@ -93,7 +94,7 @@ def test_rclone_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused
def test_rclone_crypt_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument def test_rclone_crypt_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument
"""test rclone with crypt overlay""" """test rclone with crypt overlay"""
rwm = RWM({ trwm = RWM({
"RWM_S3_ENDPOINT_URL": motoserver, "RWM_S3_ENDPOINT_URL": motoserver,
"RWM_S3_ACCESS_KEY": "dummy", "RWM_S3_ACCESS_KEY": "dummy",
"RWM_S3_SECRET_KEY": "dummy", "RWM_S3_SECRET_KEY": "dummy",
...@@ -106,51 +107,33 @@ def test_rclone_crypt_cmd(tmpworkdir: str, motoserver: str): # pylint: disable= ...@@ -106,51 +107,33 @@ def test_rclone_crypt_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=
test_file = "testfile.txt" test_file = "testfile.txt"
Path(test_file).write_text('1234', encoding='utf-8') Path(test_file).write_text('1234', encoding='utf-8')
rwm.rclone_crypt_cmd(["copy", test_file, f"rwmbe:/{test_bucket}/"]) trwm.rclone_crypt_cmd(["copy", test_file, f"rwmbe:/{test_bucket}/"])
assert len(objects_plain_list(s3.list_objects_v2(Bucket=rwm.config["RWM_RCLONE_CRYPT_BUCKET"]))) == 1 assert len(objects_plain_list(s3.list_objects_v2(Bucket=trwm.config["RWM_RCLONE_CRYPT_BUCKET"]))) == 1
rwm.rclone_crypt_cmd(["delete", f"rwmbe:/{test_bucket}/{test_file}"]) trwm.rclone_crypt_cmd(["delete", f"rwmbe:/{test_bucket}/{test_file}"])
assert s3.list_objects_v2(Bucket=rwm.config["RWM_RCLONE_CRYPT_BUCKET"])["KeyCount"] == 0 assert s3.list_objects_v2(Bucket=trwm.config["RWM_RCLONE_CRYPT_BUCKET"])["KeyCount"] == 0
test_file1 = "testfile1.txt" test_file1 = "testfile1.txt"
Path(test_file1).write_text('4321', encoding='utf-8') Path(test_file1).write_text('4321', encoding='utf-8')
rwm.rclone_crypt_cmd(["sync", ".", f"rwmbe:/{test_bucket}/"]) trwm.rclone_crypt_cmd(["sync", ".", f"rwmbe:/{test_bucket}/"])
assert s3.list_objects_v2(Bucket=rwm.config["RWM_RCLONE_CRYPT_BUCKET"])["KeyCount"] == 2 assert s3.list_objects_v2(Bucket=trwm.config["RWM_RCLONE_CRYPT_BUCKET"])["KeyCount"] == 2
Path(test_file1).unlink() Path(test_file1).unlink()
rwm.rclone_crypt_cmd(["sync", ".", f"rwmbe:/{test_bucket}/"]) trwm.rclone_crypt_cmd(["sync", ".", f"rwmbe:/{test_bucket}/"])
assert s3.list_objects_v2(Bucket=rwm.config["RWM_RCLONE_CRYPT_BUCKET"])["KeyCount"] == 1 assert s3.list_objects_v2(Bucket=trwm.config["RWM_RCLONE_CRYPT_BUCKET"])["KeyCount"] == 1
# def test_restic_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument def test_restic_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument
# """test rclone with crypt overlay""" """test restic command"""
#
# rwm_conf = { trwm = RWM({
# "S3_ENDPOINT_URL": motoserver, "RWM_S3_ENDPOINT_URL": motoserver,
# "S3_ACCESS_KEY": "dummy", "RWM_S3_ACCESS_KEY": "dummy",
# "S3_SECRET_KEY": "dummy", "RWM_S3_SECRET_KEY": "dummy",
# "RES_BUCKET": "restic_test", "RWM_RESTIC_BUCKET": "restictest",
# "RES_PASSWORD": "dummydummydummydummydummydummydummydummy", "RWM_RESTIC_PASSWORD": "dummydummydummydummydummydummydummydummy",
# } })
# s3 = boto3.client('s3', endpoint_url=motoserver, aws_access_key_id="dummy", aws_secret_access_key="dummy")
# assert trwm.restic_cmd(["init"]).returncode == 0
# test_bucket = "testbucket" proc = trwm.restic_cmd(["cat", "config"])
# test_file = "testfile.txt" assert "id" in json.loads(proc.stdout)
# Path(test_file).write_text('1234', encoding='utf-8')
#
# rwm_main(["res", "init"], rwm_conf)
# assert len(objects_plain_list(s3.list_objects_v2(Bucket=rwm_conf["RES_BUCKET"]))) == 1
#
# rwm_main(["rcc", "delete", f"rwmbe:/{test_bucket}/{test_file}"], rwm_conf)
# assert s3.list_objects_v2(Bucket=rwm_conf["RCC_CRYPT_BUCKET"])["KeyCount"] == 0
#
# test_file1 = "testfile1.txt"
# Path(test_file1).write_text('4321', encoding='utf-8')
# rwm_main(["rcc", "sync", ".", f"rwmbe:/{test_bucket}/"], rwm_conf)
# assert s3.list_objects_v2(Bucket=rwm_conf["RCC_CRYPT_BUCKET"])["KeyCount"] == 2
#
# Path(test_file1).unlink()
# rwm_main(["rcc", "sync", ".", f"rwmbe:/{test_bucket}/"], rwm_conf)
# assert s3.list_objects_v2(Bucket=rwm_conf["RCC_CRYPT_BUCKET"])["KeyCount"] == 1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment