"common/deployments/hub-production.yaml" did not exist on "9294c5ca4274215ff8ec3f2dfaafd1be8fef89b6"
Newer
Older
#!/usr/bin/env python3
"""rwm, restic/s3 worm manager"""
import logging
import os
import sys
from argparse import ArgumentParser
from dataclasses import dataclass
from datetime import datetime, timedelta
from fcntl import flock, LOCK_EX, LOCK_NB, LOCK_UN
from pathlib import Path
from typing import List, Dict, Optional
from botocore.exceptions import BotoCoreError, ClientError
from pydantic import BaseModel, ConfigDict
logger = logging.getLogger("rwm")
def is_sublist(needle, haystack):
"""Check if needle is a sublist of haystack using list slicing and equality comparison"""
# If needle is empty, it's considered a sublist of any list
if not needle:
return True
return any(haystack[i:i+len(needle)] == needle for i in range(len(haystack)))
def run_command(*args, **kwargs):
"""output capturing command executor"""
kwargs.update({
"capture_output": True,
"text": True,
"encoding": "utf-8",
})
logger.debug("run_command: %s", shlex.join(args[0]))

Radoslav Bodó
committed
return subprocess.run(*args, **kwargs, check=False)

Radoslav Bodó
committed
def wrap_output(process):
"""wraps command output and prints results"""

Radoslav Bodó
committed
if process.stdout:
print(process.stdout)
if process.stderr:
print(process.stderr, file=sys.stderr)
return process.returncode
def size_fmt(num):
"""print value formated with human readable units"""
for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB']:
if abs(num) < 1024.0:
return f'{num:0.1f} {unit}'
num /= 1024.0
return f'{num:0.1f} YiB'
def batched_iterator(lst, n):
"""yield n-items from list iterator"""
for i in range(0, len(lst), n):
yield lst[i:i + n]
class BackupConfig(BaseModel):
"""Configuration for backup operations.
Attributes:
filesdirs:
REQUIRED. List of files and directories to be backed up.
excludes:
List of patterns for `--exclude` options for `restic backup` commmand. Defaults to an empty list.
tags:
List of tags for the new backup snapshot. Defaults to an empty list.
extras:
Additional options for the `restic backup` commmand. Defaults to an empty list.
prerun:
List of shell commands to execute before backup. Defaults to an empty list.
postrun:
List of shell commands to execute after backup. Defaults to an empty list.
"""
model_config = ConfigDict(extra='forbid')
filesdirs: List[str]
excludes: List[str] = []
extras: List[str] = []
prerun: List[str] = []
postrun: List[str] = []
class RWMConfig(BaseModel):
"""Main configuration for RWM. Configuration file format is YAML.
Attributes:
s3_endpoint_url:
REQUIRED. The endpoint URL for S3.
s3_access_key:
REQUIRED. Access key for S3.
s3_secret_key:
REQUIRED. Secret key for S3.
restic_bucket:
Bucket for Restic backup repository.
restic_password:
Password for Restic backup repository.
backups:
Dictionary containing named backup configurations.
retention:
Dictionary containing retention policies for Restic repository.

Radoslav Bodó
committed
Keys and values corresponds to a `restic forget` command `--keep*`
options without leading dashes.

Radoslav Bodó
committed
Path for parallel execution exclusion lock. Defaults to
`/var/run/rwm.lock`.
autotags:
Automatically add a tag to each backup snapshot, using the value
of the backup configuration name.
backup_extras:
Additional options for any the `restic backup` commmand (eg. default `pack-size` setting).
Defaults to an empty list.
model_config = ConfigDict(extra='forbid')
s3_endpoint_url: str
s3_access_key: str
s3_secret_key: str
restic_bucket: Optional[str] = None
restic_password: Optional[str] = None
backups: Dict[str, BackupConfig] = {}
retention: Dict[str, str] = {}
lock_path: str = "/var/run/rwm.lock"

Radoslav Bodó
committed
autotags: bool = False
backup_extras: List[str] = []
class RwmJSONEncoder(json.JSONEncoder):
"""json encoder"""
def default(self, o):
if isinstance(o, datetime):
return o.isoformat()
return super().default(o) # pragma: nocover ; no other type in processeda data
@dataclass
class BackupResult:
"""backup results data container"""
name: str
returncode: int
time_start: datetime
time_end: datetime
def to_dict(self):
"""dict serializer"""
return {
"ident": "RESULT",
"name": self.name,
"status": "OK" if self.returncode == 0 else "ERROR",
"returncode": self.returncode,
"backup_start": self.time_start.isoformat(),
"backup_time": str(self.time_end-self.time_start),
}
class StorageManager:
"""s3 policed bucket manager"""
USER_BUCKET_POLICY_ACTIONS = [
# backups
"s3:ListBucket",
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:GetBucketPolicy",
"s3:ListBucketVersions",
"s3:GetBucketVersioning"
]
def __init__(self, url, access_key, secret_key, bulk_size=1000):
self.url = url
self.access_key = access_key
self.secret_key = secret_key
self.bulk_size = bulk_size
self.s3 = boto3.resource('s3', endpoint_url=url, aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key)
self.s3client = self.s3.meta.client

Radoslav Bodó
committed
def bucket_create(self, name):
"""aws s3 resource api stub"""
# boto3 client and resource api are not completely aligned
# s3.Bucket("xyz").create() returns dict instead of s3.Bucket object
return self.s3.create_bucket(Bucket=name)
def bucket_exist(self, name):
"""check if bucket exist"""
return name in [x.name for x in self.list_buckets()]
def bucket_owner(self, name):
"""aws s3 resource api stub"""
return self.s3.Bucket(name).Acl().owner["ID"]
def bucket_policy(self, name):
"""aws s3 resource api stub"""
try:
return json.loads(self.s3.Bucket(name).Policy().policy)
except (ClientError, BotoCoreError) as exc:
if "NoSuchBucketPolicy" not in str(exc):
logger.error("rwm bucket_policy error, %s", (exc))

Radoslav Bodó
committed
def bucket_acl(self, name):
"""api stub"""
acl = self.s3.Bucket(name).Acl()
return {"owner": acl.owner, "grants": acl.grants}
def list_buckets(self):
"""aws s3 resource api stub"""
return list(self.s3.buckets.all())
def list_objects(self, bucket_name):
"""aws s3 resource api stub"""
return list(self.s3.Bucket(bucket_name).objects.all())
def storage_create(self, bucket_name, target_username):
"""create policed bucket"""

Radoslav Bodó
committed
if not target_username:
raise ValueError("must specify value for bucket user")

Radoslav Bodó
committed
bucket = self.bucket_create(bucket_name)
tenant, admin_username = bucket.Acl().owner["ID"].split("$")
# grants basic RW access to user in same tenant
bucket_policy = {
"Version": "2012-10-17",
"Statement": [
# full access to admin
"Principal": {"AWS": [f"arn:aws:iam::{tenant}:user/{admin_username}"]},
"Action": ["*"],
"Resource": [f"arn:aws:s3:::{bucket.name}", f"arn:aws:s3:::{bucket.name}/*"]
},
# limited access to user
{
"Effect": "Allow",
"Principal": {"AWS": [f"arn:aws:iam::{tenant}:user/{target_username}"]},
"Action": self.USER_BUCKET_POLICY_ACTIONS,
"Resource": [f"arn:aws:s3:::{bucket.name}", f"arn:aws:s3:::{bucket.name}/*"]
}
]
}
bucket.Policy().put(Policy=json.dumps(bucket_policy))
# enforces versioning
bucket.Versioning().enable()
return bucket
def storage_delete(self, bucket_name):
"""storage delete"""
# delete all objects
paginator = self.s3client.get_paginator('list_objects')
objects = []
for page in paginator.paginate(Bucket=bucket_name):
for item in page.get("Contents", []):
objects.append({"Key": item["Key"]})
for items in batched_iterator(objects, self.bulk_size):
self.s3client.delete_objects(Bucket=bucket_name, Delete={'Objects': items, 'Quiet': True})
paginator = self.s3client.get_paginator('list_object_versions')
# delete all object versions
objects = []
for page in paginator.paginate(Bucket=bucket_name):
for item in page.get("Versions", []):
objects.append({"Key": item["Key"], "VersionId": item["VersionId"]})
for items in batched_iterator(objects, self.bulk_size):
self.s3client.delete_objects(Bucket=bucket_name, Delete={'Objects': items, 'Quiet': True})
# delete all delete markers
objects = []
for page in paginator.paginate(Bucket=bucket_name):
for item in page.get("DeleteMarkers", []):
objects.append({"Key": item["Key"], "VersionId": item["VersionId"]})
for items in batched_iterator(objects, self.bulk_size):
self.s3client.delete_objects(Bucket=bucket_name, Delete={'Objects': items, 'Quiet': True})
# delete bucket
self.s3client.delete_bucket(Bucket=bucket_name)
@staticmethod
def _policy_statements_admin(policy):
"""policy helper"""
return list(filter(lambda stmt: stmt["Action"] == ["*"], policy["Statement"]))
@staticmethod
def _policy_statements_user(policy):
"""policy helper"""
return list(filter(lambda stmt: stmt["Action"] != ["*"], policy["Statement"]))
def storage_check_policy(self, name):

Radoslav Bodó
committed
"""
storage check bucket policy
Ceph S3 API does not allow to resolve username (which is used in policy descriptor)
from access_key id, hence checks cannot directly assert if owner is or is not current
identity.
"""
if not (policy := self.bucket_policy(name)):
return False
admin_statements = self._policy_statements_admin(policy)
user_statements = self._policy_statements_user(policy)

Radoslav Bodó
committed
owner_tenant, owner_username = self.bucket_owner(name).split("$")
if ( # pylint: disable=too-many-boolean-expressions

Radoslav Bodó
committed
# two statements MUST be present on a bucket
and len(admin_statements) == 1
and len(user_statements) == 1

Radoslav Bodó
committed
# bucket owner MUST be the admin principal
and [f"arn:aws:iam::{owner_tenant}:user/{owner_username}"] == admin_statements[0]["Principal"]["AWS"]
# user MUST be another identity
and admin_statements[0]["Principal"] != user_statements[0]["Principal"]

Radoslav Bodó
committed
# user MUST have only limited access
and sorted(self.USER_BUCKET_POLICY_ACTIONS) == sorted(user_statements[0]["Action"])

Radoslav Bodó
committed
# the bucket MUST be versioned
and self.s3.Bucket(name).Versioning().status == "Enabled"
):
return True

Radoslav Bodó
committed
def storage_check_selfowned(self, name) -> bool:
"""check if bucket is self-owner
Ceph S3 API does not allow to resolve username (which is used in policy descriptor)
from access_key id, so the best guess is to check if the bucket is in the list of
buckets.
"""
return (name in [x.name for x in self.list_buckets()])

Radoslav Bodó
committed
def storage_list(self):
"""storage list"""
output = []

Radoslav Bodó
committed
for bucket in self.list_buckets():
result = {}
result["name"] = bucket.name
result["policy"] = "OK" if self.storage_check_policy(bucket.name) else "FAILED"

Radoslav Bodó
committed
result["short_owner"] = self.bucket_owner(bucket.name).split("$")[-1]
if result["policy"] == "OK":
user_statement = self._policy_statements_user(self.bucket_policy(bucket.name))[0]
result["target_user"] = user_statement["Principal"]["AWS"][0].split("/")[-1]
else:
result["target_user"] = None
output.append(result)
return output

Radoslav Bodó
committed
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def storage_info(self, bucket_name):
"""grabs storage bucket detailed info"""
result = {}
result["name"] = bucket_name
result["check_policy"] = "OK" if self.storage_check_policy(bucket_name) else "FAILED"
result["owner"] = self.bucket_owner(bucket_name)
result["acl"] = self.bucket_acl(bucket_name)
result["policy"] = self.bucket_policy(bucket_name)
result["objects"] = 0
result["delete_markers"] = 0
result["old_versions"] = 0
result["size"] = 0
result["old_size"] = 0
result["saved_states"] = []
paginator = self.s3.meta.client.get_paginator('list_object_versions')
for page in paginator.paginate(Bucket=bucket_name):
for obj in page.get("Versions", []):
if obj["IsLatest"]:
result["objects"] += 1
result["size"] += obj["Size"]
else:
result["old_versions"] += 1
result["old_size"] += obj["Size"]
result["delete_markers"] += len(page.get("DeleteMarkers", []))
for page in paginator.paginate(Bucket=bucket_name, Prefix="rwm/"):
result["saved_states"] += [(x["Key"], x["VersionId"]) for x in page.get("Versions", [])]

Radoslav Bodó
committed
return result
def storage_drop_versions(self, bucket_name):
"""
Delete all old versions and delete markers from storage to reclaim space.
Also delete all rwm saved states and generate one state after pruning.
"""
# drop all saved rwm states

Radoslav Bodó
committed
paginator = self.s3client.get_paginator('list_objects')
objects = []
for page in paginator.paginate(Bucket=bucket_name, Prefix="rwm/"):
for item in page.get("Contents", []):

Radoslav Bodó
committed
objects.append({"Key": item["Key"]})
for items in batched_iterator(objects, self.bulk_size):
self.s3client.delete_objects(Bucket=bucket_name, Delete={'Objects': items, 'Quiet': True})

Radoslav Bodó
committed
paginator = self.s3client.get_paginator('list_object_versions')
objects = []
for page in paginator.paginate(Bucket=bucket_name):
for item in page.get("Versions", []):
if not item["IsLatest"]:

Radoslav Bodó
committed
objects.append({"Key": item["Key"], "VersionId": item["VersionId"]})
for items in batched_iterator(objects, self.bulk_size):
self.s3client.delete_objects(Bucket=bucket_name, Delete={'Objects': items, 'Quiet': True})
objects = []
for page in paginator.paginate(Bucket=bucket_name):
for item in page.get("DeleteMarkers", []):

Radoslav Bodó
committed
objects.append({"Key": item["Key"], "VersionId": item["VersionId"]})
for items in batched_iterator(objects, self.bulk_size):
self.s3client.delete_objects(Bucket=bucket_name, Delete={'Objects': items, 'Quiet': True})
# save current state
ret = self.storage_save_state(bucket_name)
return ret
"""dumps current bucket state into dict"""
state = {
"bucket_name": bucket_name,

Radoslav Bodó
committed
"bucket_acl": self.bucket_acl(bucket_name),
"bucket_policy": self.bucket_policy(bucket_name),
"time_start": datetime.now(),
"time_end": None,
"versions": [],
"delete_markers": []
}
paginator = self.s3.meta.client.get_paginator('list_object_versions')
for page in paginator.paginate(Bucket=bucket_name):
state["versions"] += page.get("Versions", [])
state["delete_markers"] += page.get("DeleteMarkers", [])
state["time_end"] = datetime.now()
return state
def storage_save_state(self, bucket_name) -> int:
"""save storage state into itself"""

Radoslav Bodó
committed
# explicit error handling here, it's used during backup process
bucket_state = self.storage_state(bucket_name)
now = datetime.now().astimezone().isoformat()
self.s3.Bucket(bucket_name).upload_fileobj(
BytesIO(gzip.compress(json.dumps(bucket_state, cls=RwmJSONEncoder).encode())),
f"rwm/state_{now}.json.gz"
)
except (BotoCoreError, ClientError, TypeError) as exc:
logger.exception(exc)
return 1
return 0
def storage_restore_state(self, source_bucket_name, target_bucket_name, state_object_key, state_version):
"""create new bucket, copy data by selected state_file"""
target_bucket = self.storage_create(target_bucket_name, "dummy")
resp = self.s3.Bucket(source_bucket_name).Object(state_object_key).get(VersionId=state_version)
state = json.loads(gzip.decompress(resp['Body'].read()))
for obj in state["versions"]:
if obj["IsLatest"]:
target_bucket.Object(obj["Key"]).copy({"Bucket": source_bucket_name, "Key": obj["Key"], "VersionId": obj["VersionId"]})
return 0
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
class LockManager:
"""parallel execution locker"""
def __init__(self, lock_path):
self.lock_path = lock_path
self.lock_instance = None
def lock(self):
"""acquire lock"""
self.lock_instance = Path( # pylint: disable=consider-using-with
self.lock_path
).open(mode="w+", encoding="utf-8")
try:
flock(self.lock_instance, LOCK_EX | LOCK_NB)
except BlockingIOError:
logger.warning("failed to acquired lock")
self.lock_instance.close()
self.lock_instance = None
return 1
return 0
def unlock(self):
"""release lock"""
flock(self.lock_instance, LOCK_UN)
self.lock_instance.close()
self.lock_instance = None
class RWM:
"""rwm impl"""
def __init__(self, config_dict):
self.config = RWMConfig(**config_dict)
self.storage_manager = StorageManager(
self.config.s3_endpoint_url,
self.config.s3_access_key,
self.config.s3_secret_key
self.cronlock = LockManager(self.config.lock_path)
def aws_cmd(self, args) -> subprocess.CompletedProcess:

Radoslav Bodó
committed
"""aws cli wrapper"""
env = {
"PATH": os.environ["PATH"],
"AWS_METADATA_SERVICE_NUM_ATTEMPTS": "0",
"AWS_ACCESS_KEY_ID": self.config.s3_access_key,
"AWS_SECRET_ACCESS_KEY": self.config.s3_secret_key
}
if is_sublist(["s3", "mb"], args):
# region must be set and empty for awscil >=2.x and ?du? ceph s3
env.update({"AWS_DEFAULT_REGION": ""})
# aws cli does not have endpoint-url as env config option
return run_command(["aws", "--endpoint-url", self.config.s3_endpoint_url] + args, env=env)
def restic_cmd(self, args) -> subprocess.CompletedProcess:
"""restic command wrapper"""
env = {
"HOME": os.environ["HOME"],
"PATH": os.environ["PATH"],
"AWS_ACCESS_KEY_ID": self.config.s3_access_key,
"AWS_SECRET_ACCESS_KEY": self.config.s3_secret_key,
"RESTIC_PASSWORD": self.config.restic_password,
"RESTIC_REPOSITORY": f"s3:{self.config.s3_endpoint_url}/{self.config.restic_bucket}",
return run_command(["restic"] + args, env=env)
def _restic_backup(self, name) -> int:
"""runs restic backup by name"""
logger.info(f"_restic_backup {name}")
conf = self.config.backups[name]

Radoslav Bodó
committed
if self.config.autotags:
tags += ["--tag", name]
for item in conf.tags:
tags += ["--tag", item]

Radoslav Bodó
committed
excludes = []
for item in conf.excludes:
excludes += ["--exclude", item]
cmd_args = [
"backup",
*self.config.backup_extras,
*conf.extras,
*tags,
*excludes,
*conf.filesdirs
]
wrap_output(backup_proc := self.restic_cmd(cmd_args))
return backup_proc.returncode
def _restic_forget_prune(self) -> int:
logger.info("_restic_forget_prune")
if "keep-within" not in self.config.retention:
# if not keep-within, operational backups (eg. pre-upgrade backups) gets
# deleted, only last per day is kepth with keep-daily
logger.warning("keep-within not found in restic forget prune config")
for key, val in self.config.retention.items():
keeps += [f"--{key}", val]
cmd_args = ["forget", "--prune"] + keeps
wrap_output(forget_proc := self.restic_cmd(cmd_args))
return forget_proc.returncode
def _runparts(self, backup_name, parts_name) -> int:
"""run all commands in parts in shell"""
for part in getattr(self.config.backups[backup_name], parts_name):
logger.info(f"_runparts {parts_name} command, {json.dumps(part)}")
wrap_output(part_proc := run_command(part, shell=True))
if part_proc.returncode != 0:
logger.error("rwm _runparts failed")
return part_proc.returncode
return 0
def _backup_one(self, name) -> int:
"""perform backup"""
logger.info(f"_backup_one {name}")
if ret := self._runparts(name, "prerun"):
return ret
if ret := self._restic_backup(name):
return ret
if ret := self._runparts(name, "postrun"):
return ret
def backup(self, backup_selector: str | list) -> int:
"""backup command. perform selected backup or all configured backups"""
if self.cronlock.lock():
return 1
selected_backups = backup_selector if isinstance(backup_selector, list) else [backup_selector]
if any(name not in self.config.backups for name in selected_backups):
logger.error("invalid backup selector")
return 1

Radoslav Bodó
committed
if self.storage_manager.storage_check_selfowned(self.config.restic_bucket):
logger.warning("restic_bucket should not be self-owned")
if not self.storage_manager.storage_check_policy(self.config.restic_bucket):

Radoslav Bodó
committed
logger.warning("restic_bucket does not have expected policy")
for name in selected_backups:
ret |= last_ret
stats.append(BackupResult(name, last_ret, time_start, time_end))
if ret == 0:
time_start = datetime.now()
last_ret = self._restic_forget_prune()
ret |= last_ret
stats.append(BackupResult("_forget_prune", last_ret, time_start, time_end))
last_ret = self.storage_manager.storage_save_state(self.config.restic_bucket)
ret |= last_ret
stats.append(BackupResult("_storage_save_state", last_ret, time_start, time_end))
logger.info("backup results")
print(tabulate([item.to_dict() for item in stats], headers="keys", numalign="left"))
backup_time_end = datetime.now()
took = timedelta(seconds=(backup_time_end - backup_time_start).seconds) # drops microseconds
severity, result = (logging.INFO, "success") if ret == 0 else (logging.ERROR, "errors")
logger.log(severity, f"backup finished with {result} (ret {ret}), took {took} ({round(took.total_seconds())} sec)")
self.cronlock.unlock()
def backup_all(self) -> int:
"""backup all stub"""
return self.backup(list(self.config.backups.keys()))
def storage_create(self, bucket_name, target_username) -> int:

Radoslav Bodó
committed
self.storage_manager.storage_create(bucket_name, target_username)
def storage_delete(self, bucket_name) -> int:

Radoslav Bodó
committed
return self.storage_manager.storage_delete(bucket_name)

Radoslav Bodó
committed
def storage_list(self) -> int:
"""storage_list command"""

Radoslav Bodó
committed
print(tabulate(self.storage_manager.storage_list(), headers="keys", numalign="left"))
return 0

Radoslav Bodó
committed
def storage_info(self, bucket_name) -> int:
"""storage_list command"""
sinfo = self.storage_manager.storage_info(bucket_name)
total_size = sinfo["size"] + sinfo["old_size"]
print(f'Storage bucket: {sinfo["name"]}')
print("----------------------------------------")
print(f'RWM policy check: {sinfo["check_policy"]}')
print(f'Owner: {sinfo["owner"]}')
print(f'Objects: {sinfo["objects"]}')
print(f'Delete markers: {sinfo["delete_markers"]}')
print(f'Old versions: {sinfo["old_versions"]}')
print(f'Size: {size_fmt(sinfo["size"])}')
print(f'Old size: {size_fmt(sinfo["old_size"])}')
print(f'Total size: {size_fmt(total_size)}')
print("----------------------------------------")
print("Bucket ACL:")
print(json.dumps(sinfo["acl"], indent=2))
print("----------------------------------------")
print("Bucket policy:")
print(json.dumps(sinfo["policy"], indent=2))
print("----------------------------------------")
print("RWM saved states:")
print("\n".join([f"{key} {ver}" for key, ver in sorted(sinfo["saved_states"])]))

Radoslav Bodó
committed
return 0
def storage_state(self, bucket_name) -> int:
"""dump storage state in json"""
print(json.dumps(
self.storage_manager.storage_state(bucket_name),
indent=4,
cls=RwmJSONEncoder
))
return 0

Radoslav Bodó
committed
def storage_drop_versions(self, bucket_name) -> int:
"""storage_drop_versions command"""
return self.storage_manager.storage_drop_versions(bucket_name)
def storage_restore_state(self, source_bucket, target_bucket, state_object_key, state_version) -> int:
return self.storage_manager.storage_restore_state(source_bucket, target_bucket, state_object_key, state_version)
def configure_logging(debug):
"""configure logger"""
log_handler = logging.StreamHandler(sys.stdout)
log_handler.setFormatter(
logging.Formatter(
fmt="%(asctime)s %(name)s[%(process)d]: %(levelname)s %(message)s"
)
)
logger.addHandler(log_handler)
if debug: # pragma: no cover ; would reconfigure pylint environment
logger.setLevel(logging.DEBUG)
def parse_arguments(argv):
"""parse arguments"""
parser = ArgumentParser(description="restics3 worm manager")
parser.add_argument("--debug", action="store_true")
parser.add_argument("--config", default="rwm.conf")
subparsers = parser.add_subparsers(title="commands", dest="command", required=False)
subparsers.add_parser("version", help="show version")
aws_cmd_parser = subparsers.add_parser("aws", help="run aws cli")
aws_cmd_parser.add_argument("cmd_args", nargs="*")
restic_cmd_parser = subparsers.add_parser("restic", help="run restic")
restic_cmd_parser.add_argument("cmd_args", nargs="*")
backup_cmd_parser = subparsers.add_parser("backup", help="perform backup")
backup_cmd_parser.add_argument("name", help="backup name")
_ = subparsers.add_parser("backup-all", help="run all backups in config")
storage_create_cmd_parser = subparsers.add_parser("storage-create", help="create policed storage bucked")
storage_create_cmd_parser.add_argument("bucket_name", help="bucket name")
storage_create_cmd_parser.add_argument("target_username", help="user to be granted limited RW access")
storage_delete_cmd_parser = subparsers.add_parser("storage-delete", help="delete storage")
storage_delete_cmd_parser.add_argument("bucket_name", help="bucket name")
_ = subparsers.add_parser("storage-list", help="list storages")

Radoslav Bodó
committed
storage_info_cmd_parser = subparsers.add_parser("storage-info", help="show detailed storage info")

Radoslav Bodó
committed
storage_info_cmd_parser.add_argument("bucket_name", help="bucket name")
storage_state_cmd_parser = subparsers.add_parser("storage-state", help="dump current storage state")
storage_state_cmd_parser.add_argument("bucket_name", help="bucket name")
storage_drop_versions_cmd_parser = subparsers.add_parser(
"storage-drop-versions",
help="reclaim storage space; drop any old object versions from bucket"
)
storage_drop_versions_cmd_parser.add_argument("bucket_name", help="bucket name")
storage_restore_state_cmd_parser = subparsers.add_parser("storage-restore-state", help="restore bucketX stateX1 to bucketY")
storage_restore_state_cmd_parser.add_argument("source_bucket", help="source_bucket")
storage_restore_state_cmd_parser.add_argument("target_bucket", help="target_bucket; should not exist")
storage_restore_state_cmd_parser.add_argument("state", help="state object key in source bucket")
storage_restore_state_cmd_parser.add_argument("version", help="state object version in source bucket")
"""load config dict from file"""
config = {}

Radoslav Bodó
committed
try:
config_path = Path(path)
config_perms = config_path.stat().st_mode & 0o777
if config_perms != 0o600:
logger.warning(f"config file permissions ({config_perms:o}) are too-open")
config = dict(yaml.safe_load(config_path.read_text(encoding='utf-8')))
except (OSError, TypeError, ValueError) as exc:

Radoslav Bodó
committed
logger.error(f"cannot load config file, {exc}")
logger.debug("config, %s", config)
return config
def main(argv=None): # pylint: disable=too-many-branches
"""main"""
args = parse_arguments(argv)
configure_logging(args.debug)

Radoslav Bodó
committed
if not (config_dict := load_config(args.config)):
return 1
rwmi = RWM(config_dict)
if args.command == "version":
print(__version__)
if args.command == "aws":
ret = wrap_output(rwmi.aws_cmd(args.cmd_args))
if args.command == "restic":
ret = wrap_output(rwmi.restic_cmd(args.cmd_args))
ret = rwmi.backup(args.name)
if args.command == "backup-all":
ret = rwmi.backup_all()

Radoslav Bodó
committed
if args.command == "storage-create":
ret = rwmi.storage_create(args.bucket_name, args.target_username)
if args.command == "storage-delete":
ret = rwmi.storage_delete(args.bucket_name)
if args.command == "storage-list":

Radoslav Bodó
committed
ret = rwmi.storage_list()
if args.command == "storage-info":

Radoslav Bodó
committed
ret = rwmi.storage_info(args.bucket_name)
if args.command == "storage-state":
ret = rwmi.storage_state(args.bucket_name)
if args.command == "storage-drop-versions":
ret = rwmi.storage_drop_versions(args.bucket_name)
if args.command == "storage-restore-state":
ret = rwmi.storage_restore_state(args.source_bucket, args.target_bucket, args.state, args.version)
logger.debug("finished with %s (ret %d)", "success" if ret == 0 else "errors", ret)
if __name__ == "__main__": # pragma: nocover
sys.exit(main())