Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
R
rwm
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Radoslav Bodó
rwm
Commits
9a5f18fb
Commit
9a5f18fb
authored
1 year ago
by
Radoslav Bodó
Browse files
Options
Downloads
Patches
Plain Diff
general: subprocess outputs handling refactoring
parent
c5f39511
Branches
Branches containing commit
Tags
Tags containing commit
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
rwm.py
+73
-28
73 additions, 28 deletions
rwm.py
tests/test_default.py
+91
-44
91 additions, 44 deletions
tests/test_default.py
with
164 additions
and
72 deletions
rwm.py
+
73
−
28
View file @
9a5f18fb
...
...
@@ -4,16 +4,21 @@
import
base64
import
logging
import
os
import
subprocess
import
sys
from
argparse
import
ArgumentParser
from
pathlib
import
Path
from
subprocess
import
run
as
subrun
import
yaml
from
cryptography.hazmat.primitives.ciphers
import
Cipher
,
algorithms
,
modes
from
cryptography.hazmat.backends
import
default_backend
__version__
=
"
0.1
"
logger
=
logging
.
getLogger
(
"
rwm
"
)
logger
.
setLevel
(
logging
.
DEBUG
)
def
is_sublist
(
needle
,
haystack
):
"""
Check if needle is a sublist of haystack using list slicing and equality comparison
"""
...
...
@@ -31,6 +36,29 @@ def get_config(path):
return
{}
def
run_command
(
*
args
,
**
kwargs
):
"""
output capturing command executor
"""
kwargs
.
update
({
"
capture_output
"
:
True
,
"
text
"
:
True
,
"
encoding
"
:
"
utf-8
"
,
})
logger
.
debug
(
"
run_command, %s
"
,
(
args
,
kwargs
))
proc
=
subprocess
.
run
(
*
args
,
**
kwargs
,
check
=
False
)
return
(
proc
.
returncode
,
proc
.
stdout
,
proc
.
stderr
)
def
wrap_output
(
returncode
,
stdout
,
stderr
):
"""
wraps command output and prints results
"""
if
stdout
:
print
(
stdout
)
if
stderr
:
print
(
stderr
,
file
=
sys
.
stderr
)
return
returncode
def
rclone_obscure_password
(
plaintext
,
iv
=
None
):
"""
rclone obscure password algorithm
"""
...
...
@@ -53,7 +81,13 @@ class RWM:
self
.
config
=
config
def
aws_cmd
(
self
,
args
):
"""
aws cli wrapper
"""
"""
aws cli wrapper
:param list args: list passed to subprocess
:return: returncode, stdout, stderr
:rtype: tuple
"""
env
=
{
"
PATH
"
:
os
.
environ
[
"
PATH
"
],
...
...
@@ -66,10 +100,16 @@ class RWM:
env
.
update
({
"
AWS_DEFAULT_REGION
"
:
""
})
# aws cli does not have endpoint-url as env config option
return
sub
run
([
"
aws
"
,
"
--endpoint-url
"
,
self
.
config
[
"
S3_ENDPOINT_URL
"
]]
+
args
,
env
=
env
,
check
=
False
).
returncode
return
run
_command
([
"
aws
"
,
"
--endpoint-url
"
,
self
.
config
[
"
S3_ENDPOINT_URL
"
]]
+
args
,
env
=
env
)
def
rclone_cmd
(
self
,
args
):
"""
rclone wrapper
"""
"""
rclone wrapper
:param list args: list passed to subprocess
:return: returncode, stdout, stderr
:rtype: tuple
"""
env
=
{
"
RCLONE_CONFIG
"
:
""
,
...
...
@@ -81,33 +121,39 @@ class RWM:
"
RCLONE_CONFIG_RWMBE_ENV_AUTH
"
:
"
false
"
,
"
RCLONE_CONFIG_RWMBE_REGION
"
:
""
,
}
return
sub
run
([
"
rclone
"
]
+
args
,
env
=
env
,
check
=
False
).
returncode
return
run
_command
([
"
rclone
"
]
+
args
,
env
=
env
)
def
rclone_crypt_cmd
(
self
,
args
):
"""
rclone crypt wrapper
* https://rclone.org/docs/#config-file
* https://rclone.org/crypt/
:param list args: list passed to subprocess
:return: returncode, stdout, stderr
:rtype: tuple
"""
env
=
{
"
RCLONE_CONFIG
"
:
""
,
"
RCLONE_CONFIG_RWMBE_TYPE
"
:
"
crypt
"
,
"
RCLONE_CONFIG_RWMBE_REMOTE
"
:
f
"
rwm
s3
be:/
{
self
.
config
[
'
RCC_CRYPT_BUCKET
'
]
}
"
,
"
RCLONE_CONFIG_RWMBE_REMOTE
"
:
f
"
rwmbe
s3
:/
{
self
.
config
[
'
RCC_CRYPT_BUCKET
'
]
}
"
,
"
RCLONE_CONFIG_RWMBE_PASSWORD
"
:
rclone_obscure_password
(
self
.
config
[
"
RCC_CRYPT_PASSWORD
"
]),
"
RCLONE_CONFIG_RWMBE_PASSWORD2
"
:
rclone_obscure_password
(
self
.
config
[
"
RCC_CRYPT_PASSWORD
"
]),
"
RCLONE_CONFIG_RWM
S3
BE_TYPE
"
:
"
s3
"
,
"
RCLONE_CONFIG_RWM
S3
BE_ENDPOINT
"
:
self
.
config
[
"
S3_ENDPOINT_URL
"
],
"
RCLONE_CONFIG_RWM
S3
BE_ACCESS_KEY_ID
"
:
self
.
config
[
"
S3_ACCESS_KEY
"
],
"
RCLONE_CONFIG_RWM
S3
BE_SECRET_ACCESS_KEY
"
:
self
.
config
[
"
S3_SECRET_KEY
"
],
"
RCLONE_CONFIG_RWM
S3
BE_PROVIDER
"
:
"
Ceph
"
,
"
RCLONE_CONFIG_RWM
S3
BE_ENV_AUTH
"
:
"
false
"
,
"
RCLONE_CONFIG_RWM
S3
BE_REGION
"
:
""
,
"
RCLONE_CONFIG_RWMBE
S3
_TYPE
"
:
"
s3
"
,
"
RCLONE_CONFIG_RWMBE
S3
_ENDPOINT
"
:
self
.
config
[
"
S3_ENDPOINT_URL
"
],
"
RCLONE_CONFIG_RWMBE
S3
_ACCESS_KEY_ID
"
:
self
.
config
[
"
S3_ACCESS_KEY
"
],
"
RCLONE_CONFIG_RWMBE
S3
_SECRET_ACCESS_KEY
"
:
self
.
config
[
"
S3_SECRET_KEY
"
],
"
RCLONE_CONFIG_RWMBE
S3
_PROVIDER
"
:
"
Ceph
"
,
"
RCLONE_CONFIG_RWMBE
S3
_ENV_AUTH
"
:
"
false
"
,
"
RCLONE_CONFIG_RWMBE
S3
_REGION
"
:
""
,
}
return
sub
run
([
"
rclone
"
]
+
args
,
env
=
env
,
check
=
False
).
returncode
return
run
_command
([
"
rclone
"
]
+
args
,
env
=
env
)
def
restic_cmd
(
self
,
args
):
"""
restic command wrapper
"""
env
=
{
"
HOME
"
:
os
.
environ
[
"
HOME
"
],
"
PATH
"
:
os
.
environ
[
"
PATH
"
],
...
...
@@ -116,9 +162,10 @@ class RWM:
"
RESTIC_PASSWORD
"
:
self
.
config
[
"
RES_PASSWORD
"
],
"
RESTIC_REPOSITORY
"
:
f
"
s3:
{
self
.
config
[
'
S3_ENDPOINT_URL
'
]
}
/
{
self
.
config
[
'
RES_BUCKET
'
]
}
"
,
}
return
subrun
([
"
restic
"
]
+
args
,
env
=
env
,
check
=
False
).
returncode
return
run_command
([
"
restic
"
]
+
args
,
env
=
env
)
def
main
(
argv
=
None
,
dict_config
=
None
):
def
main
(
argv
=
None
):
"""
main
"""
parser
=
ArgumentParser
(
description
=
"
restics3 worm manager
"
)
...
...
@@ -127,11 +174,11 @@ def main(argv=None, dict_config=None):
subparsers
=
parser
.
add_subparsers
(
title
=
"
commands
"
,
dest
=
"
command
"
,
required
=
False
)
aws_cmd_parser
=
subparsers
.
add_parser
(
"
aws
"
,
help
=
"
aws command
"
)
aws_cmd_parser
.
add_argument
(
"
cmd_args
"
,
nargs
=
"
*
"
)
rc_cmd_parser
=
subparsers
.
add_parser
(
"
rc
"
,
help
=
"
rclone command
"
)
rc_cmd_parser
=
subparsers
.
add_parser
(
"
rc
lone
"
,
help
=
"
rclone command
"
)
rc_cmd_parser
.
add_argument
(
"
cmd_args
"
,
nargs
=
"
*
"
)
rcc_cmd_parser
=
subparsers
.
add_parser
(
"
rc
c
"
,
help
=
"
rclone command with crypt overlay
"
)
rcc_cmd_parser
=
subparsers
.
add_parser
(
"
rc
lone_crypt
"
,
help
=
"
rclone command with crypt overlay
"
)
rcc_cmd_parser
.
add_argument
(
"
cmd_args
"
,
nargs
=
"
*
"
)
res_cmd_parser
=
subparsers
.
add_parser
(
"
res
"
,
help
=
"
restic command
"
)
res_cmd_parser
=
subparsers
.
add_parser
(
"
res
tic
"
,
help
=
"
restic command
"
)
res_cmd_parser
.
add_argument
(
"
cmd_args
"
,
nargs
=
"
*
"
)
args
=
parser
.
parse_args
(
argv
)
...
...
@@ -139,19 +186,17 @@ def main(argv=None, dict_config=None):
config
=
{}
if
args
.
config
:
config
.
update
(
get_config
(
args
.
config
))
if
dict_config
:
config
.
update
(
dict_config
)
# assert config ?
rwm
=
RWM
(
config
)
if
args
.
command
==
"
aws
"
:
return
rwm
.
aws_cmd
(
args
.
cmd_args
)
if
args
.
command
==
"
rc
"
:
return
rwm
.
rclone_cmd
(
args
.
cmd_args
)
if
args
.
command
==
"
rc
c
"
:
return
rwm
.
rclone_crypt_cmd
(
args
.
cmd_args
)
if
args
.
command
==
"
res
"
:
return
rwm
.
restic_cmd
(
args
.
cmd_args
)
return
wrap_output
(
*
rwm
.
aws_cmd
(
args
.
cmd_args
)
)
if
args
.
command
==
"
rc
lone
"
:
return
wrap_output
(
*
rwm
.
rclone_cmd
(
args
.
cmd_args
)
)
if
args
.
command
==
"
rc
lone_crypt
"
:
return
wrap_output
(
*
rwm
.
rclone_crypt_cmd
(
args
.
cmd_args
)
)
if
args
.
command
==
"
res
tic
"
:
return
wrap_output
(
*
rwm
.
restic_cmd
(
args
.
cmd_args
)
)
return
0
...
...
This diff is collapsed.
Click to expand it.
tests/test_default.py
+
91
−
44
View file @
9a5f18fb
"""
default tests
"""
from
pathlib
import
Path
from
textwrap
import
dedent
import
boto3
from
rwm
import
is_sublist
,
main
as
rwm_main
,
rclone_obscure_password
from
rwm
import
is_sublist
,
main
as
rwm_main
,
rclone_obscure_password
,
RWM
def
buckets_plain_list
(
full_response
):
"""
boto3 helper
"""
return
[
x
[
"
Name
"
]
for
x
in
full_response
[
"
Buckets
"
]]
def
objects_plain_list
(
full_response
):
"""
boto3 helper
"""
return
[
x
[
"
Key
"
]
for
x
in
full_response
[
"
Contents
"
]]
def
test_sublist
():
...
...
@@ -14,104 +27,138 @@ def test_sublist():
assert
not
is_sublist
([
1
,
3
],
[
5
,
4
,
1
,
2
,
3
,
6
,
7
])
def
test_
config
(
tmpworkdir
:
str
):
# pylint: disable=unused-argument
"""
test
config handl
in
g
"""
def
test_
main
(
tmpworkdir
:
str
):
# pylint: disable=unused-argument
"""
test
ma
in
"""
Path
(
"
rwm.conf
"
).
touch
()
rwm_main
([])
assert
rwm_main
([])
==
0
Path
(
"
rwm.conf
"
).
write_text
(
dedent
(
"""
S3_ENDPOINT_URL:
"
dummy
"
S3_ACCESS_KEY:
"
dummy
"
S3_SECRET_KEY:
"
dummy
"
def
buckets_plain_list
(
full_response
):
"""
boto3 helper
""
"
RCC_CRYPT_BUCKET:
"
dummy-nasbackup-test1
"
RCC_CRYPT_PASSWORD:
"
dummy
"
return
[
x
[
"
Name
"
]
for
x
in
full_response
[
"
Buckets
"
]]
RES_BUCKET:
"
dummy
"
RES_PASSWORD:
"
dummy
"
"""
),
encoding
=
"
utf-8
"
)
assert
rwm_main
([])
==
0
def
objects_plain_list
(
full_response
):
"""
boto3 helper
"""
assert
rwm_main
([
"
aws
"
,
"
--
"
,
"
--version
"
])
==
0
assert
rwm_main
([
"
aws
"
,
"
notexist
"
])
!=
0
return
[
x
[
"
Key
"
]
for
x
in
full_response
[
"
Contents
"
]]
assert
rwm_main
([
"
rclone
"
,
"
version
"
])
==
0
assert
rwm_main
([
"
rclone_crypt
"
,
"
version
"
])
==
0
assert
rwm_main
([
"
restic
"
,
"
version
"
])
==
0
def
test_aws
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
def
test_aws
_cmd
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test aws command
"""
rwm
_conf
=
{
rwm
=
RWM
(
{
"
S3_ENDPOINT_URL
"
:
motoserver
,
"
S3_ACCESS_KEY
"
:
"
dummy
"
,
"
S3_SECRET_KEY
"
:
"
dummy
"
,
}
}
)
s3
=
boto3
.
client
(
'
s3
'
,
endpoint_url
=
motoserver
,
aws_access_key_id
=
"
dummy
"
,
aws_secret_access_key
=
"
dummy
"
)
test_bucket
=
"
testbucket
"
assert
test_bucket
not
in
buckets_plain_list
(
s3
.
list_buckets
())
rwm
_main
([
"
aws
"
,
"
s3
"
,
"
mb
"
,
f
"
s3://
{
test_bucket
}
"
]
,
rwm_conf
)
rwm
.
aws_cmd
([
"
s3
"
,
"
mb
"
,
f
"
s3://
{
test_bucket
}
"
])
assert
test_bucket
in
buckets_plain_list
(
s3
.
list_buckets
())
rwm
_main
([
"
aws
"
,
"
s3
"
,
"
rb
"
,
f
"
s3://
{
test_bucket
}
"
]
,
rwm_conf
)
rwm
.
aws_cmd
([
"
s3
"
,
"
rb
"
,
f
"
s3://
{
test_bucket
}
"
])
assert
test_bucket
not
in
buckets_plain_list
(
s3
.
list_buckets
())
def
test_rclone
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
def
test_rclone
_cmd
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test rclone command
"""
rwm
_conf
=
{
rwm
=
RWM
(
{
"
S3_ENDPOINT_URL
"
:
motoserver
,
"
S3_ACCESS_KEY
"
:
"
dummy
"
,
"
S3_SECRET_KEY
"
:
"
dummy
"
,
}
}
)
s3
=
boto3
.
client
(
'
s3
'
,
endpoint_url
=
motoserver
,
aws_access_key_id
=
"
dummy
"
,
aws_secret_access_key
=
"
dummy
"
)
test_bucket
=
"
testbucket
"
test_file
=
"
testfile.txt
"
Path
(
test_file
).
write_text
(
'
1234
'
,
encoding
=
'
utf-8
'
)
rwm
_main
([
"
rc
"
,
"
mkdir
"
,
f
"
rwmbe:/
{
test_bucket
}
/
"
]
,
rwm_conf
)
rwm
_main
([
"
rc
"
,
"
copy
"
,
test_file
,
f
"
rwmbe:/
{
test_bucket
}
/
"
]
,
rwm_conf
)
rwm
.
rclone_cmd
([
"
mkdir
"
,
f
"
rwmbe:/
{
test_bucket
}
/
"
])
rwm
.
rclone_cmd
([
"
copy
"
,
test_file
,
f
"
rwmbe:/
{
test_bucket
}
/
"
])
assert
test_bucket
in
buckets_plain_list
(
s3
.
list_buckets
())
assert
test_file
in
objects_plain_list
(
s3
.
list_objects_v2
(
Bucket
=
test_bucket
))
def
test_rclone_argscheck
():
"""
test rclone args checking
"""
assert
rwm_main
([
"
rc
"
,
"
dummy
"
])
==
1
def
test_rclone_crypt
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
def
test_rclone_crypt_cmd
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test rclone with crypt overlay
"""
rwm
_conf
=
{
rwm
=
RWM
(
{
"
S3_ENDPOINT_URL
"
:
motoserver
,
"
S3_ACCESS_KEY
"
:
"
dummy
"
,
"
S3_SECRET_KEY
"
:
"
dummy
"
,
"
RCC_CRYPT_BUCKET
"
:
"
cryptdata_test
"
,
"
RCC_CRYPT_PASSWORD
"
:
rclone_obscure_password
(
"
dummydummydummydummydummydummydummydummy
"
),
}
}
)
s3
=
boto3
.
client
(
'
s3
'
,
endpoint_url
=
motoserver
,
aws_access_key_id
=
"
dummy
"
,
aws_secret_access_key
=
"
dummy
"
)
test_bucket
=
"
testbucket
"
test_file
=
"
testfile.txt
"
Path
(
test_file
).
write_text
(
'
1234
'
,
encoding
=
'
utf-8
'
)
rwm
_main
([
"
rcc
"
,
"
copy
"
,
test_file
,
f
"
rwmbe:/
{
test_bucket
}
/
"
]
,
rwm_conf
)
assert
len
(
objects_plain_list
(
s3
.
list_objects_v2
(
Bucket
=
rwm
_
conf
[
"
RCC_CRYPT_BUCKET
"
])))
==
1
rwm
.
rclone_crypt_cmd
([
"
copy
"
,
test_file
,
f
"
rwmbe:/
{
test_bucket
}
/
"
])
assert
len
(
objects_plain_list
(
s3
.
list_objects_v2
(
Bucket
=
rwm
.
conf
ig
[
"
RCC_CRYPT_BUCKET
"
])))
==
1
rwm
_main
([
"
rcc
"
,
"
delete
"
,
f
"
rwmbe:/
{
test_bucket
}
/
{
test_file
}
"
]
,
rwm_conf
)
assert
s3
.
list_objects_v2
(
Bucket
=
rwm
_
conf
[
"
RCC_CRYPT_BUCKET
"
])[
"
KeyCount
"
]
==
0
rwm
.
rclone_crypt_cmd
([
"
delete
"
,
f
"
rwmbe:/
{
test_bucket
}
/
{
test_file
}
"
])
assert
s3
.
list_objects_v2
(
Bucket
=
rwm
.
conf
ig
[
"
RCC_CRYPT_BUCKET
"
])[
"
KeyCount
"
]
==
0
test_file1
=
"
testfile1.txt
"
Path
(
test_file1
).
write_text
(
'
4321
'
,
encoding
=
'
utf-8
'
)
rwm
_main
([
"
rcc
"
,
"
sync
"
,
"
.
"
,
f
"
rwmbe:/
{
test_bucket
}
/
"
]
,
rwm_conf
)
assert
s3
.
list_objects_v2
(
Bucket
=
rwm
_
conf
[
"
RCC_CRYPT_BUCKET
"
])[
"
KeyCount
"
]
==
2
rwm
.
rclone_crypt_cmd
([
"
sync
"
,
"
.
"
,
f
"
rwmbe:/
{
test_bucket
}
/
"
])
assert
s3
.
list_objects_v2
(
Bucket
=
rwm
.
conf
ig
[
"
RCC_CRYPT_BUCKET
"
])[
"
KeyCount
"
]
==
2
Path
(
test_file1
).
unlink
()
rwm_main
([
"
rcc
"
,
"
sync
"
,
"
.
"
,
f
"
rwmbe:/
{
test_bucket
}
/
"
],
rwm_conf
)
assert
s3
.
list_objects_v2
(
Bucket
=
rwm_conf
[
"
RCC_CRYPT_BUCKET
"
])[
"
KeyCount
"
]
==
1
def
test_rclone_crypt_argscheck
():
"""
test rclone crypt args checking
"""
assert
rwm_main
([
"
rcc
"
,
"
dummy
"
])
==
1
rwm
.
rclone_crypt_cmd
([
"
sync
"
,
"
.
"
,
f
"
rwmbe:/
{
test_bucket
}
/
"
])
assert
s3
.
list_objects_v2
(
Bucket
=
rwm
.
config
[
"
RCC_CRYPT_BUCKET
"
])[
"
KeyCount
"
]
==
1
# def test_restic_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused-argument
# """test rclone with crypt overlay"""
#
# rwm_conf = {
# "S3_ENDPOINT_URL": motoserver,
# "S3_ACCESS_KEY": "dummy",
# "S3_SECRET_KEY": "dummy",
# "RES_BUCKET": "restic_test",
# "RES_PASSWORD": "dummydummydummydummydummydummydummydummy",
# }
# s3 = boto3.client('s3', endpoint_url=motoserver, aws_access_key_id="dummy", aws_secret_access_key="dummy")
#
# test_bucket = "testbucket"
# test_file = "testfile.txt"
# Path(test_file).write_text('1234', encoding='utf-8')
#
# rwm_main(["res", "init"], rwm_conf)
# assert len(objects_plain_list(s3.list_objects_v2(Bucket=rwm_conf["RES_BUCKET"]))) == 1
#
# rwm_main(["rcc", "delete", f"rwmbe:/{test_bucket}/{test_file}"], rwm_conf)
# assert s3.list_objects_v2(Bucket=rwm_conf["RCC_CRYPT_BUCKET"])["KeyCount"] == 0
#
# test_file1 = "testfile1.txt"
# Path(test_file1).write_text('4321', encoding='utf-8')
# rwm_main(["rcc", "sync", ".", f"rwmbe:/{test_bucket}/"], rwm_conf)
# assert s3.list_objects_v2(Bucket=rwm_conf["RCC_CRYPT_BUCKET"])["KeyCount"] == 2
#
# Path(test_file1).unlink()
# rwm_main(["rcc", "sync", ".", f"rwmbe:/{test_bucket}/"], rwm_conf)
# assert s3.list_objects_v2(Bucket=rwm_conf["RCC_CRYPT_BUCKET"])["KeyCount"] == 1
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment