Skip to content
Snippets Groups Projects
rwm.py 31.9 KiB
Newer Older
#!/usr/bin/env python3
"""rwm, restic/s3 worm manager"""

import gzip
import json
import logging
import os
import shlex
import subprocess
import sys
from argparse import ArgumentParser
from datetime import datetime, timedelta
from fcntl import flock, LOCK_EX, LOCK_NB, LOCK_UN
from io import BytesIO
from pathlib import Path
from typing import List, Dict, Optional
import boto3
from botocore.exceptions import BotoCoreError, ClientError
from pydantic import BaseModel, ConfigDict
from tabulate import tabulate
Radoslav Bodó's avatar
Radoslav Bodó committed
__version__ = "1.2"
logger = logging.getLogger("rwm")
logger.setLevel(logging.INFO)
def is_sublist(needle, haystack):
    """Check if needle is a sublist of haystack using list slicing and equality comparison"""

    # If needle is empty, it's considered a sublist of any list
    if not needle:
        return True
    return any(haystack[i:i+len(needle)] == needle for i in range(len(haystack)))


def run_command(*args, **kwargs):
    """output capturing command executor"""

    kwargs.update({
        "capture_output": True,
        "text": True,
        "encoding": "utf-8",
    })
    logger.debug("run_command: %s", shlex.join(args[0]))
    return subprocess.run(*args, **kwargs, check=False)
    """wraps command output and prints results"""

    if process.stdout:
        print(process.stdout)
    if process.stderr:
        print(process.stderr, file=sys.stderr)
    return process.returncode
def size_fmt(num):
    """print value formated with human readable units"""

    for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB']:
        if abs(num) < 1024.0:
            return f'{num:0.1f} {unit}'
        num /= 1024.0
    return f'{num:0.1f} YiB'


def batched_iterator(lst, n):
    """yield n-items from list iterator"""

    for i in range(0, len(lst), n):
        yield lst[i:i + n]


Radoslav Bodó's avatar
Radoslav Bodó committed
class BackupConfig(BaseModel):
    """Configuration for backup operations.

    Attributes:
        filesdirs:
            REQUIRED. List of files and directories to be backed up.

        excludes:
            List of patterns for `--exclude` options for `restic backup` commmand. Defaults to an empty list.

        tags:
            List of tags for the new backup snapshot. Defaults to an empty list.

Radoslav Bodó's avatar
Radoslav Bodó committed
        extras:
            Additional options for the `restic backup` commmand. Defaults to an empty list.

        prerun:
            List of shell commands to execute before backup. Defaults to an empty list.

        postrun:
            List of shell commands to execute after backup. Defaults to an empty list.
    """

    model_config = ConfigDict(extra='forbid')

    filesdirs: List[str]
    excludes: List[str] = []
    tags: List[str] = []
Radoslav Bodó's avatar
Radoslav Bodó committed
    extras: List[str] = []
    prerun: List[str] = []
    postrun: List[str] = []


class RWMConfig(BaseModel):
    """Main configuration for RWM. Configuration file format is YAML.
    Attributes:
        s3_endpoint_url:
            REQUIRED. The endpoint URL for S3.
        s3_access_key:
            REQUIRED. Access key for S3.
        s3_secret_key:
            REQUIRED. Secret key for S3.
        restic_bucket:
            Bucket for Restic backup repository.

        restic_password:
            Password for Restic backup repository.

        backups:
            Dictionary containing named backup configurations.

        retention:
            Dictionary containing retention policies for Restic repository.
            Keys and values corresponds to a `restic forget` command `--keep*`
            options without leading dashes.
            Path for parallel execution exclusion lock. Defaults to
            `/var/run/rwm.lock`.

        autotags:
            Automatically add a tag to each backup snapshot, using the value
            of the backup configuration name.

        backup_extras:
            Additional options for any the `restic backup` commmand (eg. default `pack-size` setting).
            Defaults to an empty list.

    model_config = ConfigDict(extra='forbid')

    s3_endpoint_url: str
    s3_access_key: str
    s3_secret_key: str
    restic_bucket: Optional[str] = None
    restic_password: Optional[str] = None
    backups: Dict[str, BackupConfig] = {}
    retention: Dict[str, str] = {}
    lock_path: str = "/var/run/rwm.lock"
    autotags: bool = False
    backup_extras: List[str] = []
class RwmJSONEncoder(json.JSONEncoder):
    """json encoder"""

    def default(self, o):
        if isinstance(o, datetime):
            return o.isoformat()
        return super().default(o)  # pragma: nocover  ; no other type in processeda data


@dataclass
class BackupResult:
    """backup results data container"""

    name: str
    returncode: int
    time_start: datetime
    time_end: datetime

    def to_dict(self):
        """dict serializer"""

        return {
            "ident": "RESULT",
            "name": self.name,
            "status": "OK" if self.returncode == 0 else "ERROR",
            "returncode": self.returncode,
            "backup_start": self.time_start.isoformat(),
            "backup_time": str(self.time_end-self.time_start),
        }


class StorageManager:
    """s3 policed bucket manager"""

    USER_BUCKET_POLICY_ACTIONS = [
        # backups
        "s3:ListBucket",
        "s3:GetObject",
        "s3:PutObject",
        "s3:DeleteObject",
        # policies
        "s3:GetBucketAcl",
        "s3:GetBucketPolicy",
        "s3:ListBucketVersions",
        "s3:GetBucketVersioning"
    ]

    def __init__(self, url, access_key, secret_key, bulk_size=1000):
        self.url = url
        self.access_key = access_key
        self.secret_key = secret_key
        self.s3 = boto3.resource('s3', endpoint_url=url, aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key)
        self.s3client = self.s3.meta.client
        """aws s3 resource api stub"""
        # boto3 client and resource api are not completely aligned
        # s3.Bucket("xyz").create() returns dict instead of s3.Bucket object
        return self.s3.create_bucket(Bucket=name)

    def bucket_exist(self, name):
        """check if bucket exist"""
        return name in [x.name for x in self.list_buckets()]

    def bucket_owner(self, name):
        """aws s3 resource api stub"""
        return self.s3.Bucket(name).Acl().owner["ID"]

    def bucket_policy(self, name):
        """aws s3 resource api stub"""

        try:
            return json.loads(self.s3.Bucket(name).Policy().policy)
        except (ClientError, BotoCoreError) as exc:
            if "NoSuchBucketPolicy" not in str(exc):
                logger.error("rwm bucket_policy error, %s", (exc))
    def bucket_acl(self, name):
        """api stub"""
        acl = self.s3.Bucket(name).Acl()
        return {"owner": acl.owner, "grants": acl.grants}

    def list_buckets(self):
        """aws s3 resource api stub"""
        return list(self.s3.buckets.all())

    def list_objects(self, bucket_name):
        """aws s3 resource api stub"""
        return list(self.s3.Bucket(bucket_name).objects.all())

    def storage_create(self, bucket_name, target_username):
        """create policed bucket"""

        if not target_username:
            raise ValueError("must specify value for bucket user")
        bucket = self.bucket_create(bucket_name)
        tenant, admin_username = bucket.Acl().owner["ID"].split("$")

        # grants basic RW access to user in same tenant
        bucket_policy = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Principal": {"AWS": [f"arn:aws:iam::{tenant}:user/{admin_username}"]},
                    "Action": ["*"],
                    "Resource": [f"arn:aws:s3:::{bucket.name}", f"arn:aws:s3:::{bucket.name}/*"]
                },
                # limited access to user
                {
                    "Effect": "Allow",
                    "Principal": {"AWS": [f"arn:aws:iam::{tenant}:user/{target_username}"]},
                    "Action": self.USER_BUCKET_POLICY_ACTIONS,
                    "Resource": [f"arn:aws:s3:::{bucket.name}", f"arn:aws:s3:::{bucket.name}/*"]
                }
            ]
        }
        bucket.Policy().put(Policy=json.dumps(bucket_policy))

        # enforces versioning
        bucket.Versioning().enable()

        return bucket
    def storage_delete(self, bucket_name):
        """storage delete"""

        paginator = self.s3client.get_paginator('list_objects')
        objects = []
        for page in paginator.paginate(Bucket=bucket_name):
            for item in page.get("Contents", []):
                objects.append({"Key": item["Key"]})
        for items in batched_iterator(objects, self.bulk_size):
            self.s3client.delete_objects(Bucket=bucket_name, Delete={'Objects': items, 'Quiet': True})
        paginator = self.s3client.get_paginator('list_object_versions')

        # delete all object versions
        objects = []
        for page in paginator.paginate(Bucket=bucket_name):
            for item in page.get("Versions", []):
                objects.append({"Key": item["Key"], "VersionId": item["VersionId"]})
        for items in batched_iterator(objects, self.bulk_size):
            self.s3client.delete_objects(Bucket=bucket_name, Delete={'Objects': items, 'Quiet': True})

        # delete all delete markers
        objects = []
        for page in paginator.paginate(Bucket=bucket_name):
            for item in page.get("DeleteMarkers", []):
                objects.append({"Key": item["Key"], "VersionId": item["VersionId"]})
        for items in batched_iterator(objects, self.bulk_size):
            self.s3client.delete_objects(Bucket=bucket_name, Delete={'Objects': items, 'Quiet': True})
        self.s3client.delete_bucket(Bucket=bucket_name)
        return 0
    @staticmethod
    def _policy_statements_admin(policy):
        """policy helper"""
        return list(filter(lambda stmt: stmt["Action"] == ["*"], policy["Statement"]))

    @staticmethod
    def _policy_statements_user(policy):
        """policy helper"""
        return list(filter(lambda stmt: stmt["Action"] != ["*"], policy["Statement"]))

    def storage_check_policy(self, name):
        """
        storage check bucket policy

        Ceph S3 API does not allow to resolve username (which is used in policy descriptor)
        from access_key id, hence checks cannot directly assert if owner is or is not current
        identity.
        """

        if not (policy := self.bucket_policy(name)):
            return False

        admin_statements = self._policy_statements_admin(policy)
        user_statements = self._policy_statements_user(policy)
        owner_tenant, owner_username = self.bucket_owner(name).split("$")

        if (  # pylint: disable=too-many-boolean-expressions
            # two statements MUST be present on a bucket
            len(policy["Statement"]) == 2
            and len(admin_statements) == 1
            and len(user_statements) == 1
            # bucket owner MUST be the admin principal
            and [f"arn:aws:iam::{owner_tenant}:user/{owner_username}"] == admin_statements[0]["Principal"]["AWS"]
            # user MUST be another identity
            and admin_statements[0]["Principal"] != user_statements[0]["Principal"]
            and sorted(self.USER_BUCKET_POLICY_ACTIONS) == sorted(user_statements[0]["Action"])
            and self.s3.Bucket(name).Versioning().status == "Enabled"
        ):
            return True
    def storage_check_selfowned(self, name) -> bool:
        """check if bucket is self-owner

        Ceph S3 API does not allow to resolve username (which is used in policy descriptor)
        from access_key id, so the best guess is to check if the bucket is in the list of
        buckets.
        """

        return (name in [x.name for x in self.list_buckets()])

            result = {}
            result["name"] = bucket.name
            result["policy"] = "OK" if self.storage_check_policy(bucket.name) else "FAILED"
            result["short_owner"] = self.bucket_owner(bucket.name).split("$")[-1]
                user_statement = self._policy_statements_user(self.bucket_policy(bucket.name))[0]
                result["target_user"] = user_statement["Principal"]["AWS"][0].split("/")[-1]
            else:
                result["target_user"] = None

            output.append(result)

        return output

    def storage_info(self, bucket_name):
        """grabs storage bucket detailed info"""

        result = {}
        result["name"] = bucket_name
        result["check_policy"] = "OK" if self.storage_check_policy(bucket_name) else "FAILED"
        result["owner"] = self.bucket_owner(bucket_name)
        result["acl"] = self.bucket_acl(bucket_name)
        result["policy"] = self.bucket_policy(bucket_name)
        result["objects"] = 0
        result["delete_markers"] = 0
        result["old_versions"] = 0
        result["size"] = 0
        result["old_size"] = 0
        result["saved_states"] = []

        paginator = self.s3.meta.client.get_paginator('list_object_versions')
        for page in paginator.paginate(Bucket=bucket_name):
            for obj in page.get("Versions", []):
                if obj["IsLatest"]:
                    result["objects"] += 1
                    result["size"] += obj["Size"]
                else:
                    result["old_versions"] += 1
                    result["old_size"] += obj["Size"]
            result["delete_markers"] += len(page.get("DeleteMarkers", []))

        for page in paginator.paginate(Bucket=bucket_name, Prefix="rwm/"):
            result["saved_states"] += [(x["Key"], x["VersionId"]) for x in page.get("Versions", [])]
    def storage_drop_versions(self, bucket_name):
        """
        Delete all old versions and delete markers from storage to reclaim space.
        Also delete all rwm saved states and generate one state after pruning.
        """

        # ? lock repo

        # drop all saved rwm states
        paginator = self.s3client.get_paginator('list_objects')
        objects = []
        for page in paginator.paginate(Bucket=bucket_name, Prefix="rwm/"):
            for item in page.get("Contents", []):
                objects.append({"Key": item["Key"]})
        for items in batched_iterator(objects, self.bulk_size):
            self.s3client.delete_objects(Bucket=bucket_name, Delete={'Objects': items, 'Quiet': True})
        paginator = self.s3client.get_paginator('list_object_versions')

        # drop all active object versions
        objects = []
        for page in paginator.paginate(Bucket=bucket_name):
            for item in page.get("Versions", []):
                if not item["IsLatest"]:
                    objects.append({"Key": item["Key"], "VersionId": item["VersionId"]})
        for items in batched_iterator(objects, self.bulk_size):
            self.s3client.delete_objects(Bucket=bucket_name, Delete={'Objects': items, 'Quiet': True})

        # drop all delete markers
        objects = []
        for page in paginator.paginate(Bucket=bucket_name):
            for item in page.get("DeleteMarkers", []):
                objects.append({"Key": item["Key"], "VersionId": item["VersionId"]})
        for items in batched_iterator(objects, self.bulk_size):
            self.s3client.delete_objects(Bucket=bucket_name, Delete={'Objects': items, 'Quiet': True})
        # save current state
        ret = self.storage_save_state(bucket_name)

        return ret
    def storage_state(self, bucket_name):
        """dumps current bucket state into dict"""

        state = {
            "bucket_name": bucket_name,
            "bucket_policy": self.bucket_policy(bucket_name),
            "time_start": datetime.now(),
            "time_end": None,
            "versions": [],
            "delete_markers": []
        }

        paginator = self.s3.meta.client.get_paginator('list_object_versions')
        for page in paginator.paginate(Bucket=bucket_name):
            state["versions"] += page.get("Versions", [])
            state["delete_markers"] += page.get("DeleteMarkers", [])
        state["time_end"] = datetime.now()

        return state

    def storage_save_state(self, bucket_name) -> int:
        """save storage state into itself"""

        # explicit error handling here, it's used during backup process
            bucket_state = self.storage_state(bucket_name)
            now = datetime.now().astimezone().isoformat()
            self.s3.Bucket(bucket_name).upload_fileobj(
                BytesIO(gzip.compress(json.dumps(bucket_state, cls=RwmJSONEncoder).encode())),
                f"rwm/state_{now}.json.gz"
            )
        except (BotoCoreError, ClientError, TypeError) as exc:
            logger.exception(exc)
            return 1

        return 0

    def storage_restore_state(self, source_bucket_name, target_bucket_name, state_object_key, state_version):
        """create new bucket, copy data by selected state_file"""

        target_bucket = self.storage_create(target_bucket_name, "dummy")
        resp = self.s3.Bucket(source_bucket_name).Object(state_object_key).get(VersionId=state_version)
        state = json.loads(gzip.decompress(resp['Body'].read()))

        for obj in state["versions"]:
            if obj["IsLatest"]:
                target_bucket.Object(obj["Key"]).copy({"Bucket": source_bucket_name, "Key": obj["Key"], "VersionId": obj["VersionId"]})

        return 0

class LockManager:
    """parallel execution locker"""

    def __init__(self, lock_path):
        self.lock_path = lock_path
        self.lock_instance = None

    def lock(self):
        """acquire lock"""

        self.lock_instance = Path(  # pylint: disable=consider-using-with
            self.lock_path
        ).open(mode="w+", encoding="utf-8")
        try:
            flock(self.lock_instance, LOCK_EX | LOCK_NB)
        except BlockingIOError:
            logger.warning("failed to acquired lock")
            self.lock_instance.close()
            self.lock_instance = None
            return 1

        return 0

    def unlock(self):
        """release lock"""

        flock(self.lock_instance, LOCK_UN)
        self.lock_instance.close()
        self.lock_instance = None


    def __init__(self, config_dict):
        self.config = RWMConfig(**config_dict)
        self.storage_manager = StorageManager(
            self.config.s3_endpoint_url,
            self.config.s3_access_key,
            self.config.s3_secret_key
        self.cronlock = LockManager(self.config.lock_path)
    def aws_cmd(self, args) -> subprocess.CompletedProcess:

        env = {
            "PATH": os.environ["PATH"],
            "AWS_METADATA_SERVICE_NUM_ATTEMPTS": "0",
            "AWS_ACCESS_KEY_ID": self.config.s3_access_key,
            "AWS_SECRET_ACCESS_KEY": self.config.s3_secret_key
        }
        if is_sublist(["s3", "mb"], args):
            # region must be set and empty for awscil >=2.x and ?du? ceph s3
            env.update({"AWS_DEFAULT_REGION": ""})

        # aws cli does not have endpoint-url as env config option
        return run_command(["aws", "--endpoint-url", self.config.s3_endpoint_url] + args, env=env)
    def restic_cmd(self, args) -> subprocess.CompletedProcess:
        """restic command wrapper"""

Radoslav Bodó's avatar
Radoslav Bodó committed
        env = {
            "HOME": os.environ["HOME"],
            "PATH": os.environ["PATH"],
            "AWS_ACCESS_KEY_ID": self.config.s3_access_key,
            "AWS_SECRET_ACCESS_KEY": self.config.s3_secret_key,
            "RESTIC_PASSWORD": self.config.restic_password,
            "RESTIC_REPOSITORY": f"s3:{self.config.s3_endpoint_url}/{self.config.restic_bucket}",
Radoslav Bodó's avatar
Radoslav Bodó committed
        }
        return run_command(["restic"] + args, env=env)

    def _restic_backup(self, name) -> int:
        """runs restic backup by name"""

        logger.info(f"_restic_backup {name}")
        if self.config.autotags:
            tags += ["--tag", name]
        for item in conf.tags:
            tags += ["--tag", item]

        excludes = []
        for item in conf.excludes:
            excludes += ["--exclude", item]

        cmd_args = [
            "backup",
            *self.config.backup_extras,
            *conf.extras,
            *tags,
            *excludes,
            *conf.filesdirs
        ]
        wrap_output(backup_proc := self.restic_cmd(cmd_args))
        return backup_proc.returncode
    def _restic_forget_prune(self) -> int:
        """runs forget prune"""
        logger.info("_restic_forget_prune")

        if "keep-within" not in self.config.retention:
            # if not keep-within, operational backups (eg. pre-upgrade backups) gets
            # deleted, only last per day is kepth with keep-daily
            logger.warning("keep-within not found in restic forget prune config")

        keeps = []
        for key, val in self.config.retention.items():
            keeps += [f"--{key}", val]
        cmd_args = ["forget", "--prune"] + keeps

        wrap_output(forget_proc := self.restic_cmd(cmd_args))
        return forget_proc.returncode
    def _runparts(self, backup_name, parts_name) -> int:
        """run all commands in parts in shell"""

        for part in getattr(self.config.backups[backup_name], parts_name):
            logger.info(f"_runparts {parts_name} command, {json.dumps(part)}")
            wrap_output(part_proc := run_command(part, shell=True))
            if part_proc.returncode != 0:
                logger.error("rwm _runparts failed")
                return part_proc.returncode

        return 0

    def _backup_one(self, name) -> int:
        """perform backup"""
        logger.info(f"_backup_one {name}")
        if ret := self._runparts(name, "prerun"):
            return ret
        if ret := self._restic_backup(name):
            return ret
        if ret := self._runparts(name, "postrun"):
            return ret
    def backup(self, backup_selector: str | list) -> int:
        """backup command. perform selected backup or all configured backups"""
        if self.cronlock.lock():
            return 1
        backup_time_start = datetime.now()
        stats = []
        selected_backups = backup_selector if isinstance(backup_selector, list) else [backup_selector]
        if any(name not in self.config.backups for name in selected_backups):
            logger.error("invalid backup selector")
            return 1

        if self.storage_manager.storage_check_selfowned(self.config.restic_bucket):
            logger.warning("restic_bucket should not be self-owned")

        if not self.storage_manager.storage_check_policy(self.config.restic_bucket):
            logger.warning("restic_bucket does not have expected policy")
        for name in selected_backups:
            time_start = datetime.now()
            last_ret = self._backup_one(name)
            time_end = datetime.now()
            ret |= last_ret
            stats.append(BackupResult(name, last_ret, time_start, time_end))

        if ret == 0:
            time_start = datetime.now()
            last_ret = self._restic_forget_prune()
            time_end = datetime.now()
            ret |= last_ret
            stats.append(BackupResult("_forget_prune", last_ret, time_start, time_end))

        time_start = datetime.now()
        last_ret = self.storage_manager.storage_save_state(self.config.restic_bucket)
        time_end = datetime.now()
        ret |= last_ret
        stats.append(BackupResult("_storage_save_state", last_ret, time_start, time_end))
        logger.info("backup results")
        print(tabulate([item.to_dict() for item in stats], headers="keys", numalign="left"))
        backup_time_end = datetime.now()
        took = timedelta(seconds=(backup_time_end - backup_time_start).seconds)  # drops microseconds
        severity, result = (logging.INFO, "success") if ret == 0 else (logging.ERROR, "errors")
        logger.log(severity, f"backup finished with {result} (ret {ret}), took {took} ({round(took.total_seconds())} sec)")

    def backup_all(self) -> int:
        """backup all stub"""
        return self.backup(list(self.config.backups.keys()))

    def storage_create(self, bucket_name, target_username) -> int:
        """storage create command"""

        self.storage_manager.storage_create(bucket_name, target_username)
    def storage_delete(self, bucket_name) -> int:
        """storage delete command"""

        return self.storage_manager.storage_delete(bucket_name)
        print(tabulate(self.storage_manager.storage_list(), headers="keys", numalign="left"))
    def storage_info(self, bucket_name) -> int:
        """storage_list command"""

        sinfo = self.storage_manager.storage_info(bucket_name)
        total_size = sinfo["size"] + sinfo["old_size"]

        print(f'Storage bucket:   {sinfo["name"]}')
        print("----------------------------------------")
        print(f'RWM policy check: {sinfo["check_policy"]}')
        print(f'Owner:            {sinfo["owner"]}')
        print(f'Objects:          {sinfo["objects"]}')
        print(f'Delete markers:   {sinfo["delete_markers"]}')
        print(f'Old versions:     {sinfo["old_versions"]}')
        print(f'Size:             {size_fmt(sinfo["size"])}')
        print(f'Old size:         {size_fmt(sinfo["old_size"])}')
        print(f'Total size:       {size_fmt(total_size)}')
        print("----------------------------------------")
        print("Bucket ACL:")
        print(json.dumps(sinfo["acl"], indent=2))
        print("----------------------------------------")
        print("Bucket policy:")
        print(json.dumps(sinfo["policy"], indent=2))
        print("----------------------------------------")
        print("RWM saved states:")
        print("\n".join([f"{key} {ver}" for key, ver in sorted(sinfo["saved_states"])]))
    def storage_state(self, bucket_name) -> int:
        """dump storage state in json"""

        print(json.dumps(
            self.storage_manager.storage_state(bucket_name),
            indent=4,
            cls=RwmJSONEncoder
        ))
        return 0

    def storage_drop_versions(self, bucket_name) -> int:
        """storage_drop_versions command"""

        return self.storage_manager.storage_drop_versions(bucket_name)

    def storage_restore_state(self, source_bucket, target_bucket, state_object_key, state_version) -> int:
        """storage restore state"""

        return self.storage_manager.storage_restore_state(source_bucket, target_bucket, state_object_key, state_version)

def configure_logging(debug):
    """configure logger"""

    log_handler = logging.StreamHandler(sys.stdout)
    log_handler.setFormatter(
        logging.Formatter(
            fmt="%(asctime)s %(name)s[%(process)d]: %(levelname)s %(message)s"
        )
    )
    logger.addHandler(log_handler)
    if debug:  # pragma: no cover  ; would reconfigure pylint environment
        logger.setLevel(logging.DEBUG)


def parse_arguments(argv):
    """parse arguments"""

    parser = ArgumentParser(description="restics3 worm manager")
    parser.add_argument("--debug", action="store_true")
    parser.add_argument("--config", default="rwm.conf")
    subparsers = parser.add_subparsers(title="commands", dest="command", required=False)
    subparsers.add_parser("version", help="show version")
    aws_cmd_parser = subparsers.add_parser("aws", help="run aws cli")
    aws_cmd_parser.add_argument("cmd_args", nargs="*")
    restic_cmd_parser = subparsers.add_parser("restic", help="run restic")
    restic_cmd_parser.add_argument("cmd_args", nargs="*")

    backup_cmd_parser = subparsers.add_parser("backup", help="perform backup")
    backup_cmd_parser.add_argument("name", help="backup name")

    _ = subparsers.add_parser("backup-all", help="run all backups in config")
    storage_create_cmd_parser = subparsers.add_parser("storage-create", help="create policed storage bucked")
    storage_create_cmd_parser.add_argument("bucket_name", help="bucket name")
    storage_create_cmd_parser.add_argument("target_username", help="user to be granted limited RW access")

    storage_delete_cmd_parser = subparsers.add_parser("storage-delete", help="delete storage")
    storage_delete_cmd_parser.add_argument("bucket_name", help="bucket name")
    _ = subparsers.add_parser("storage-list", help="list storages")
    storage_info_cmd_parser = subparsers.add_parser("storage-info", help="show detailed storage info")
    storage_info_cmd_parser.add_argument("bucket_name", help="bucket name")
    storage_state_cmd_parser = subparsers.add_parser("storage-state", help="dump current storage state")
    storage_state_cmd_parser.add_argument("bucket_name", help="bucket name")

    storage_drop_versions_cmd_parser = subparsers.add_parser(
        help="reclaim storage space; drop any old object versions from bucket"
    )
    storage_drop_versions_cmd_parser.add_argument("bucket_name", help="bucket name")
    storage_restore_state_cmd_parser = subparsers.add_parser("storage-restore-state", help="restore bucketX stateX1 to bucketY")
    storage_restore_state_cmd_parser.add_argument("source_bucket", help="source_bucket")
    storage_restore_state_cmd_parser.add_argument("target_bucket", help="target_bucket; should not exist")
    storage_restore_state_cmd_parser.add_argument("state", help="state object key in source bucket")
    storage_restore_state_cmd_parser.add_argument("version", help="state object version in source bucket")
    return parser.parse_args(argv)
def load_config(path) -> dict:
    """load config dict from file"""

    config = {}
    try:
        config_path = Path(path)
        config_perms = config_path.stat().st_mode & 0o777
        if config_perms != 0o600:
            logger.warning(f"config file permissions ({config_perms:o}) are too-open")
        config = dict(yaml.safe_load(config_path.read_text(encoding='utf-8')))
    except (OSError, TypeError, ValueError) as exc:
        logger.error(f"cannot load config file, {exc}")
def main(argv=None):  # pylint: disable=too-many-branches
    """main"""

    args = parse_arguments(argv)
    configure_logging(args.debug)
    if not (config_dict := load_config(args.config)):
        return 1
    rwmi = RWM(config_dict)
    ret = -1
    if args.command == "version":
        print(__version__)
        ret = 0
    if args.command == "aws":
        ret = wrap_output(rwmi.aws_cmd(args.cmd_args))
    if args.command == "restic":
        ret = wrap_output(rwmi.restic_cmd(args.cmd_args))
    if args.command == "backup":
        ret = rwmi.backup(args.name)
    if args.command == "backup-all":
    if args.command == "storage-create":
        ret = rwmi.storage_create(args.bucket_name, args.target_username)
    if args.command == "storage-delete":
        ret = rwmi.storage_delete(args.bucket_name)
    if args.command == "storage-list":
    if args.command == "storage-info":
    if args.command == "storage-state":
        ret = rwmi.storage_state(args.bucket_name)

    if args.command == "storage-drop-versions":
        ret = rwmi.storage_drop_versions(args.bucket_name)
    if args.command == "storage-restore-state":
        ret = rwmi.storage_restore_state(args.source_bucket, args.target_bucket, args.state, args.version)
    logger.debug("finished with %s (ret %d)", "success" if ret == 0 else "errors", ret)
    return ret


if __name__ == "__main__":  # pragma: nocover
    sys.exit(main())