Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
R
rwm
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Radoslav Bodó
rwm
Commits
8b5f52e6
Commit
8b5f52e6
authored
Apr 2, 2024
by
Radoslav Bodó
Browse files
Options
Downloads
Patches
Plain Diff
rwm, tests: cleanups and refactorings, also drops autoinit from "rwm backup"
parent
3e2e6876
No related branches found
No related tags found
No related merge requests found
Pipeline
#7440
failed
Apr 2, 2024
Stage: code_quality
Changes
5
Pipelines
1
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
README.md
+4
-0
4 additions, 0 deletions
README.md
rwm.py
+12
-38
12 additions, 38 deletions
rwm.py
tests/test_default.py
+21
-344
21 additions, 344 deletions
tests/test_default.py
tests/test_rwm.py
+287
-0
287 additions, 0 deletions
tests/test_rwm.py
tests/test_storage.py
+54
-50
54 additions, 50 deletions
tests/test_storage.py
with
378 additions
and
432 deletions
README.md
+
4
−
0
View file @
8b5f52e6
...
@@ -99,6 +99,8 @@ backups follows standard restic procedures, but adds profile like configuration
...
@@ -99,6 +99,8 @@ backups follows standard restic procedures, but adds profile like configuration
```
```
cp examples/rwm-backups.conf rwm.conf
cp examples/rwm-backups.conf rwm.conf
rwm restic init # should create bucket on it's own
rwm backup_all
rwm backup_all
rwm restic snapshots
rwm restic snapshots
rwm restic mount /mnt/restore
rwm restic mount /mnt/restore
...
@@ -116,7 +118,9 @@ rwm --confg admin.conf storage_check_policy bucket1
...
@@ -116,7 +118,9 @@ rwm --confg admin.conf storage_check_policy bucket1
rwm --confg admin.conf storage_list
rwm --confg admin.conf storage_list
cp examples/rwm-backups.conf rwm.conf
cp examples/rwm-backups.conf rwm.conf
rwm restic init
rwm storage_check_policy bucket1
rwm storage_check_policy bucket1
rwm backup_all
rwm backup_all
rwm restic snapshots
rwm restic snapshots
rwm restic mount /mnt/restore
rwm restic mount /mnt/restore
...
...
This diff is collapsed.
Click to expand it.
rwm.py
+
12
−
38
View file @
8b5f52e6
...
@@ -111,7 +111,7 @@ class StorageManager:
...
@@ -111,7 +111,7 @@ class StorageManager:
self
.
secret_key
=
secret_key
self
.
secret_key
=
secret_key
self
.
s3
=
boto3
.
resource
(
'
s3
'
,
endpoint_url
=
url
,
aws_access_key_id
=
self
.
access_key
,
aws_secret_access_key
=
self
.
secret_key
)
self
.
s3
=
boto3
.
resource
(
'
s3
'
,
endpoint_url
=
url
,
aws_access_key_id
=
self
.
access_key
,
aws_secret_access_key
=
self
.
secret_key
)
def
create_bucket
(
self
,
name
):
def
bucket_create
(
self
,
name
):
"""
aws s3 resource api stub
"""
"""
aws s3 resource api stub
"""
# boto3 client and resource api are not completely aligned
# boto3 client and resource api are not completely aligned
# s3.Bucket("xyz").create() returns dict instead of s3.Bucket object
# s3.Bucket("xyz").create() returns dict instead of s3.Bucket object
...
@@ -148,7 +148,7 @@ class StorageManager:
...
@@ -148,7 +148,7 @@ class StorageManager:
if
(
not
bucket_name
)
or
(
not
target_username
):
if
(
not
bucket_name
)
or
(
not
target_username
):
raise
ValueError
(
"
must specify value for bucket and user
"
)
raise
ValueError
(
"
must specify value for bucket and user
"
)
bucket
=
self
.
create_bucket
(
bucket_name
)
bucket
=
self
.
bucket_create
(
bucket_name
)
tenant
,
manager_username
=
bucket
.
Acl
().
owner
[
"
ID
"
].
split
(
"
$
"
)
tenant
,
manager_username
=
bucket
.
Acl
().
owner
[
"
ID
"
].
split
(
"
$
"
)
# grants basic RW access to user in same tenant
# grants basic RW access to user in same tenant
...
@@ -284,15 +284,7 @@ class RWM:
...
@@ -284,15 +284,7 @@ class RWM:
}
}
return
run_command
([
"
restic
"
]
+
args
,
env
=
env
)
return
run_command
([
"
restic
"
]
+
args
,
env
=
env
)
def
restic_autoinit
(
self
)
->
subprocess
.
CompletedProcess
:
def
_restic_backup
(
self
,
name
)
->
subprocess
.
CompletedProcess
:
"""
runs restic init
"""
logger
.
info
(
"
run restic_autoinit
"
)
if
(
proc
:
=
self
.
restic_cmd
([
"
cat
"
,
"
config
"
])).
returncode
!=
0
:
proc
=
self
.
restic_cmd
([
"
init
"
])
return
proc
def
restic_backup
(
self
,
name
)
->
subprocess
.
CompletedProcess
:
"""
runs restic backup by name
"""
"""
runs restic backup by name
"""
logger
.
info
(
f
"
run restic_backup
{
name
}
"
)
logger
.
info
(
f
"
run restic_backup
{
name
}
"
)
...
@@ -305,7 +297,7 @@ class RWM:
...
@@ -305,7 +297,7 @@ class RWM:
return
self
.
restic_cmd
(
cmd_args
)
return
self
.
restic_cmd
(
cmd_args
)
def
restic_forget_prune
(
self
)
->
subprocess
.
CompletedProcess
:
def
_
restic_forget_prune
(
self
)
->
subprocess
.
CompletedProcess
:
"""
runs forget prune
"""
"""
runs forget prune
"""
logger
.
info
(
"
run restic_forget_prune
"
)
logger
.
info
(
"
run restic_forget_prune
"
)
...
@@ -322,20 +314,14 @@ class RWM:
...
@@ -322,20 +314,14 @@ class RWM:
# TODO: check target backup policy, restic automatically creates
# TODO: check target backup policy, restic automatically creates
# bucket if ot does not exist with null-policy
# bucket if ot does not exist with null-policy
autoinit_proc
=
self
.
restic_autoinit
()
wrap_output
(
backup_proc
:
=
self
.
_restic_backup
(
name
))
if
autoinit_proc
.
returncode
!=
0
:
logger
.
error
(
"
restic autoinit failed
"
)
wrap_output
(
autoinit_proc
)
return
autoinit_proc
wrap_output
(
backup_proc
:
=
self
.
restic_backup
(
name
))
if
backup_proc
.
returncode
!=
0
:
if
backup_proc
.
returncode
!=
0
:
logger
.
error
(
"
restic_backup failed
"
)
logger
.
error
(
"
rwm _
restic_backup failed
"
)
return
backup_proc
return
backup_proc
wrap_output
(
forget_proc
:
=
self
.
restic_forget_prune
())
wrap_output
(
forget_proc
:
=
self
.
_
restic_forget_prune
())
if
forget_proc
.
returncode
!=
0
:
if
forget_proc
.
returncode
!=
0
:
logger
.
error
(
"
restic_forget_prune failed
"
)
logger
.
error
(
"
rwm _
restic_forget_prune failed
"
)
return
forget_proc
return
forget_proc
return
backup_proc
return
backup_proc
...
@@ -346,25 +332,16 @@ class RWM:
...
@@ -346,25 +332,16 @@ class RWM:
stats
=
{}
stats
=
{}
ret
=
0
ret
=
0
time_start
=
datetime
.
now
()
autoinit_proc
=
self
.
restic_autoinit
()
time_end
=
datetime
.
now
()
if
autoinit_proc
.
returncode
!=
0
:
logger
.
error
(
"
restic autoinit failed
"
)
wrap_output
(
autoinit_proc
)
return
autoinit_proc
.
returncode
stats
[
"
_autoinit
"
]
=
BackupResult
(
"
_autoinit
"
,
autoinit_proc
.
returncode
,
time_start
,
time_end
)
for
name
in
self
.
config
[
"
rwm_backups
"
].
keys
():
for
name
in
self
.
config
[
"
rwm_backups
"
].
keys
():
time_start
=
datetime
.
now
()
time_start
=
datetime
.
now
()
wrap_output
(
backup_proc
:
=
self
.
restic_backup
(
name
))
wrap_output
(
backup_proc
:
=
self
.
_
restic_backup
(
name
))
time_end
=
datetime
.
now
()
time_end
=
datetime
.
now
()
ret
|=
backup_proc
.
returncode
ret
|=
backup_proc
.
returncode
stats
[
name
]
=
BackupResult
(
name
,
backup_proc
.
returncode
,
time_start
,
time_end
)
stats
[
name
]
=
BackupResult
(
name
,
backup_proc
.
returncode
,
time_start
,
time_end
)
if
ret
==
0
:
if
ret
==
0
:
time_start
=
datetime
.
now
()
time_start
=
datetime
.
now
()
wrap_output
(
forget_proc
:
=
self
.
restic_forget_prune
())
wrap_output
(
forget_proc
:
=
self
.
_
restic_forget_prune
())
time_end
=
datetime
.
now
()
time_end
=
datetime
.
now
()
ret
|=
forget_proc
.
returncode
ret
|=
forget_proc
.
returncode
stats
[
"
_forget_prune
"
]
=
BackupResult
(
"
_forget_prune
"
,
forget_proc
.
returncode
,
time_start
,
time_end
)
stats
[
"
_forget_prune
"
]
=
BackupResult
(
"
_forget_prune
"
,
forget_proc
.
returncode
,
time_start
,
time_end
)
...
@@ -400,11 +377,7 @@ class RWM:
...
@@ -400,11 +377,7 @@ class RWM:
def
storage_check_policy_cmd
(
self
,
bucket_name
)
->
int
:
def
storage_check_policy_cmd
(
self
,
bucket_name
)
->
int
:
"""
storage check policy command
"""
"""
storage check policy command
"""
ret
,
msg
=
(
ret
,
msg
=
(
0
,
"
OK
"
)
if
self
.
storage_manager
.
storage_check_policy
(
bucket_name
)
else
(
1
,
"
FAILED
"
)
(
0
,
"
OK
"
)
if
self
.
storage_manager
.
storage_check_policy
(
bucket_name
)
==
True
else
(
1
,
"
FAILED
"
)
)
logger
.
debug
(
"
bucket policy: %s
"
,
json
.
dumps
(
self
.
storage_manager
.
bucket_policy
(
bucket_name
),
indent
=
4
))
logger
.
debug
(
"
bucket policy: %s
"
,
json
.
dumps
(
self
.
storage_manager
.
bucket_policy
(
bucket_name
),
indent
=
4
))
print
(
msg
)
print
(
msg
)
return
ret
return
ret
...
@@ -498,6 +471,7 @@ def main(argv=None):
...
@@ -498,6 +471,7 @@ def main(argv=None):
if
args
.
command
==
"
backup_all
"
:
if
args
.
command
==
"
backup_all
"
:
ret
=
rwmi
.
backup_all_cmd
()
ret
=
rwmi
.
backup_all_cmd
()
logger
.
info
(
"
rwm backup_all finished with %s (ret %d)
"
,
"
success
"
if
ret
==
0
else
"
errors
"
,
ret
)
logger
.
info
(
"
rwm backup_all finished with %s (ret %d)
"
,
"
success
"
if
ret
==
0
else
"
errors
"
,
ret
)
if
args
.
command
==
"
storage_create
"
:
if
args
.
command
==
"
storage_create
"
:
ret
=
rwmi
.
storage_create_cmd
(
args
.
bucket_name
,
args
.
target_username
)
ret
=
rwmi
.
storage_create_cmd
(
args
.
bucket_name
,
args
.
target_username
)
if
args
.
command
==
"
storage_delete
"
:
if
args
.
command
==
"
storage_delete
"
:
...
...
This diff is collapsed.
Click to expand it.
tests/test_default.py
+
21
−
344
View file @
8b5f52e6
"""
default tests
"""
"""
default tests
"""
import
json
from
pathlib
import
Path
from
pathlib
import
Path
from
subprocess
import
CompletedProcess
from
subprocess
import
CompletedProcess
from
unittest.mock
import
Mock
,
patch
from
unittest.mock
import
Mock
,
patch
import
boto3
import
rwm
import
rwm
from
rwm
import
is_sublist
,
main
as
rwm_main
,
rclone_obscure_password
,
RWM
,
wrap_output
from
rwm
import
is_sublist
,
main
as
rwm_main
,
wrap_output
def
buckets_plain_list
(
full_response
):
"""
boto3 helper
"""
return
[
x
[
"
Name
"
]
for
x
in
full_response
[
"
Buckets
"
]]
def
objects_plain_list
(
full_response
):
"""
boto3 helper
"""
return
[
x
[
"
Key
"
]
for
x
in
full_response
[
"
Contents
"
]]
def
test_sublist
():
def
test_sublist
():
...
@@ -45,335 +31,26 @@ def test_main(tmpworkdir: str): # pylint: disable=unused-argument
...
@@ -45,335 +31,26 @@ def test_main(tmpworkdir: str): # pylint: disable=unused-argument
assert
rwm_main
([
"
version
"
])
==
0
assert
rwm_main
([
"
version
"
])
==
0
# command branches
# command branches
mock
=
Mock
(
return_value
=
CompletedProcess
(
args
=
'
dummy
'
,
returncode
=
0
))
mock_proc
=
Mock
(
return_value
=
CompletedProcess
(
args
=
'
dummy
'
,
returncode
=
0
))
for
item
in
[
"
aws
"
,
"
rclone
"
,
"
rclone_crypt
"
,
"
restic
"
,
"
backup
"
]:
mock_ok
=
Mock
(
return_value
=
0
)
with
patch
.
object
(
rwm
.
RWM
,
f
"
{
item
}
_cmd
"
,
mock
):
assert
rwm_main
([
item
,
"
dummy
"
])
==
0
with
patch
.
object
(
rwm
.
RWM
,
f
"
aws_cmd
"
,
mock_proc
):
assert
rwm_main
([
"
aws
"
,
"
dummy
"
])
==
0
mock
=
Mock
(
return_value
=
0
)
with
patch
.
object
(
rwm
.
RWM
,
f
"
rclone_cmd
"
,
mock_proc
):
assert
rwm_main
([
"
rclone
"
,
"
dummy
"
])
==
0
with
patch
.
object
(
rwm
.
RWM
,
"
backup_all_cmd
"
,
mock
):
with
patch
.
object
(
rwm
.
RWM
,
f
"
rclone_crypt_cmd
"
,
mock_proc
):
assert
rwm_main
([
"
rclone_crypt
"
,
"
dummy
"
])
==
0
with
patch
.
object
(
rwm
.
RWM
,
f
"
restic_cmd
"
,
mock_proc
):
assert
rwm_main
([
"
restic
"
,
"
dummy
"
])
==
0
with
patch
.
object
(
rwm
.
RWM
,
f
"
backup_cmd
"
,
mock_proc
):
assert
rwm_main
([
"
backup
"
,
"
dummy
"
])
==
0
with
patch
.
object
(
rwm
.
RWM
,
"
backup_all_cmd
"
,
mock_ok
):
assert
rwm_main
([
"
backup_all
"
])
==
0
assert
rwm_main
([
"
backup_all
"
])
==
0
with
patch
.
object
(
rwm
.
RWM
,
"
storage_create_cmd
"
,
mock
):
with
patch
.
object
(
rwm
.
RWM
,
"
storage_create_cmd
"
,
mock
_ok
):
assert
rwm_main
([
"
storage_create
"
,
"
bucket
"
,
"
user
"
])
==
0
assert
rwm_main
([
"
storage_create
"
,
"
bucket
"
,
"
user
"
])
==
0
with
patch
.
object
(
rwm
.
RWM
,
"
storage_delete_cmd
"
,
mock_ok
):
for
item
in
[
"
storage_delete
"
,
"
storage_check_policy
"
]:
assert
rwm_main
([
"
storage_delete
"
,
"
bucket
"
])
==
0
with
patch
.
object
(
rwm
.
RWM
,
f
"
{
item
}
_cmd
"
,
mock
):
with
patch
.
object
(
rwm
.
RWM
,
"
storage_check_policy_cmd
"
,
mock_ok
):
assert
rwm_main
([
item
,
"
bucket
"
])
==
0
assert
rwm_main
([
"
storage_check_policy
"
,
"
bucket
"
])
==
0
def
test_aws_cmd
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test aws command
"""
trwm
=
RWM
({
"
rwm_s3_endpoint_url
"
:
motoserver
,
"
rwm_s3_access_key
"
:
"
dummy
"
,
"
rwm_s3_secret_key
"
:
"
dummy
"
,
})
s3
=
boto3
.
client
(
'
s3
'
,
endpoint_url
=
motoserver
,
aws_access_key_id
=
"
dummy
"
,
aws_secret_access_key
=
"
dummy
"
)
test_bucket
=
"
testbucket
"
assert
test_bucket
not
in
buckets_plain_list
(
s3
.
list_buckets
())
trwm
.
aws_cmd
([
"
s3
"
,
"
mb
"
,
f
"
s3://
{
test_bucket
}
"
])
assert
test_bucket
in
buckets_plain_list
(
s3
.
list_buckets
())
trwm
.
aws_cmd
([
"
s3
"
,
"
rb
"
,
f
"
s3://
{
test_bucket
}
"
])
assert
test_bucket
not
in
buckets_plain_list
(
s3
.
list_buckets
())
def
test_rclone_cmd
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test rclone command
"""
trwm
=
RWM
({
"
rwm_s3_endpoint_url
"
:
motoserver
,
"
rwm_s3_access_key
"
:
"
dummy
"
,
"
rwm_s3_secret_key
"
:
"
dummy
"
,
})
s3
=
boto3
.
client
(
'
s3
'
,
endpoint_url
=
motoserver
,
aws_access_key_id
=
"
dummy
"
,
aws_secret_access_key
=
"
dummy
"
)
test_bucket
=
"
testbucket
"
test_file
=
"
testfile.txt
"
Path
(
test_file
).
write_text
(
'
1234
'
,
encoding
=
'
utf-8
'
)
trwm
.
rclone_cmd
([
"
mkdir
"
,
f
"
rwmbe:/
{
test_bucket
}
/
"
])
trwm
.
rclone_cmd
([
"
copy
"
,
test_file
,
f
"
rwmbe:/
{
test_bucket
}
/
"
])
assert
test_bucket
in
buckets_plain_list
(
s3
.
list_buckets
())
assert
test_file
in
objects_plain_list
(
s3
.
list_objects_v2
(
Bucket
=
test_bucket
))
def
test_rclone_crypt_cmd
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test rclone with crypt overlay
"""
trwm
=
RWM
({
"
rwm_s3_endpoint_url
"
:
motoserver
,
"
rwm_s3_access_key
"
:
"
dummy
"
,
"
rwm_s3_secret_key
"
:
"
dummy
"
,
"
rwm_rclone_crypt_bucket
"
:
"
cryptdata_test
"
,
"
rwm_rclone_crypt_password
"
:
rclone_obscure_password
(
"
dummydummydummydummy
"
),
})
s3
=
boto3
.
client
(
'
s3
'
,
endpoint_url
=
motoserver
,
aws_access_key_id
=
"
dummy
"
,
aws_secret_access_key
=
"
dummy
"
)
test_bucket
=
"
testbucket
"
test_file
=
"
testfile.txt
"
Path
(
test_file
).
write_text
(
'
1234
'
,
encoding
=
'
utf-8
'
)
trwm
.
rclone_crypt_cmd
([
"
copy
"
,
test_file
,
f
"
rwmbe:/
{
test_bucket
}
/
"
])
assert
len
(
objects_plain_list
(
s3
.
list_objects_v2
(
Bucket
=
trwm
.
config
[
"
rwm_rclone_crypt_bucket
"
])))
==
1
trwm
.
rclone_crypt_cmd
([
"
delete
"
,
f
"
rwmbe:/
{
test_bucket
}
/
{
test_file
}
"
])
assert
s3
.
list_objects_v2
(
Bucket
=
trwm
.
config
[
"
rwm_rclone_crypt_bucket
"
])[
"
KeyCount
"
]
==
0
test_file1
=
"
testfile1.txt
"
Path
(
test_file1
).
write_text
(
'
4321
'
,
encoding
=
'
utf-8
'
)
trwm
.
rclone_crypt_cmd
([
"
sync
"
,
"
.
"
,
f
"
rwmbe:/
{
test_bucket
}
/
"
])
assert
s3
.
list_objects_v2
(
Bucket
=
trwm
.
config
[
"
rwm_rclone_crypt_bucket
"
])[
"
KeyCount
"
]
==
2
Path
(
test_file1
).
unlink
()
trwm
.
rclone_crypt_cmd
([
"
sync
"
,
"
.
"
,
f
"
rwmbe:/
{
test_bucket
}
/
"
])
assert
s3
.
list_objects_v2
(
Bucket
=
trwm
.
config
[
"
rwm_rclone_crypt_bucket
"
])[
"
KeyCount
"
]
==
1
def
test_restic_cmd
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test restic command
"""
trwm
=
RWM
({
"
rwm_s3_endpoint_url
"
:
motoserver
,
"
rwm_s3_access_key
"
:
"
dummy
"
,
"
rwm_s3_secret_key
"
:
"
dummy
"
,
"
rwm_restic_bucket
"
:
"
restictest
"
,
"
rwm_restic_password
"
:
"
dummydummydummydummy
"
,
})
assert
trwm
.
restic_cmd
([
"
init
"
]).
returncode
==
0
proc
=
trwm
.
restic_cmd
([
"
cat
"
,
"
config
"
])
assert
"
id
"
in
json
.
loads
(
proc
.
stdout
)
def
_list_snapshots
(
trwm
):
"""
test helper
"""
return
json
.
loads
(
trwm
.
restic_cmd
([
"
snapshots
"
,
"
--json
"
]).
stdout
)
def
_list_files
(
trwm
,
snapshot_id
):
"""
test helper
"""
snapshot_ls
=
[
json
.
loads
(
x
)
for
x
in
trwm
.
restic_cmd
([
"
ls
"
,
snapshot_id
,
"
--json
"
]).
stdout
.
splitlines
()
]
return
[
x
[
"
path
"
]
for
x
in
snapshot_ls
if
(
x
[
"
struct_type
"
]
==
"
node
"
and
x
[
"
type
"
]
==
"
file
"
)
]
def
test_backup_cmd
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test backup_cmd command
"""
trwm
=
RWM
({
"
rwm_s3_endpoint_url
"
:
motoserver
,
"
rwm_s3_access_key
"
:
"
dummy
"
,
"
rwm_s3_secret_key
"
:
"
dummy
"
,
"
rwm_restic_bucket
"
:
"
restictest
"
,
"
rwm_restic_password
"
:
"
dummydummydummydummy
"
,
"
rwm_backups
"
:
{
"
testcfg
"
:
{
"
filesdirs
"
:
[
"
testdatadir/
"
],
"
excludes
"
:
[
"
testfile_to_be_ignored
"
],
"
extras
"
:
[
"
--tag
"
,
"
dummytag
"
],
}
},
"
rwm_retention
"
:
{
"
keep-daily
"
:
"
1
"
}
})
Path
(
"
testdatadir
"
).
mkdir
()
Path
(
"
testdatadir/testdata1.txt
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
Path
(
"
testdatadir/testfile_to_be_ignored
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
assert
trwm
.
backup_cmd
(
"
testcfg
"
).
returncode
==
0
snapshots
=
_list_snapshots
(
trwm
)
assert
len
(
snapshots
)
==
1
snapshot_files
=
_list_files
(
trwm
,
snapshots
[
0
][
"
id
"
])
assert
"
/testdatadir/testdata1.txt
"
in
snapshot_files
def
test_backup_cmd_excludes
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test backup command
"""
trwm
=
RWM
({
"
rwm_s3_endpoint_url
"
:
motoserver
,
"
rwm_s3_access_key
"
:
"
dummy
"
,
"
rwm_s3_secret_key
"
:
"
dummy
"
,
"
rwm_restic_bucket
"
:
"
restictest
"
,
"
rwm_restic_password
"
:
"
dummydummydummydummy
"
,
"
rwm_backups
"
:
{
"
testcfg
"
:
{
"
filesdirs
"
:
[
"
testdatadir
"
],
"
excludes
"
:
[
"
proc/*
"
,
"
*.ignored
"
],
"
extras
"
:
[
"
--tag
"
,
"
dummytag
"
],
}
}
})
Path
(
"
testdatadir
"
).
mkdir
()
Path
(
"
testdatadir/etc
"
).
mkdir
()
Path
(
"
testdatadir/etc/config
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
Path
(
"
testdatadir/etc/config2
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
Path
(
"
testdatadir/etc/config3.ignored
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
Path
(
"
testdatadir/etc/proc
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
Path
(
"
testdatadir/etc/processor
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
Path
(
"
testdatadir/proc
"
).
mkdir
()
Path
(
"
testdatadir/proc/to_be_also_excluded
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
Path
(
"
testdatadir/processor
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
Path
(
"
testdatadir/some_other_proc_essor
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
Path
(
"
testdatadir/var
"
).
mkdir
()
Path
(
"
testdatadir/var/proc
"
).
mkdir
()
Path
(
"
testdatadir/var/proc/data
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
assert
trwm
.
backup_cmd
(
"
testcfg
"
).
returncode
==
0
snapshots
=
_list_snapshots
(
trwm
)
assert
len
(
snapshots
)
==
1
snapshot_files
=
_list_files
(
trwm
,
snapshots
[
0
][
"
id
"
])
assert
"
/testdatadir/etc/config
"
in
snapshot_files
assert
"
/testdatadir/etc/config2
"
in
snapshot_files
assert
"
/testdatadir/etc/config3.ignored
"
not
in
snapshot_files
assert
"
/testdatadir/etc/proc
"
in
snapshot_files
assert
"
/testdatadir/etc/processor
"
in
snapshot_files
assert
"
/testdatadir/proc
"
not
in
snapshot_files
assert
"
/testdatadir/proc/to_be_also_excluded
"
not
in
snapshot_files
assert
"
/testdatadir/processor
"
in
snapshot_files
assert
"
/testdatadir/some_other_proc_essor
"
in
snapshot_files
# following expected result does not work, because test config uses root-unanchored exclude path "proc/*"
# assert "/testdatadir/var/proc/data" in snapshot_files
def
test_backup_cmd_error_handling
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test backup command err cases
"""
rwm_conf
=
{
"
rwm_backups
"
:
{
"
dummycfg
"
:
{
"
filesdirs
"
:
[
"
dummydir
"
]}
}
}
mock_ok
=
Mock
(
return_value
=
CompletedProcess
(
args
=
'
dummy
'
,
returncode
=
0
))
mock_fail
=
Mock
(
return_value
=
CompletedProcess
(
args
=
'
dummy
'
,
returncode
=
11
))
with
patch
.
object
(
rwm
.
RWM
,
"
restic_autoinit
"
,
mock_fail
):
assert
RWM
(
rwm_conf
).
backup_cmd
(
"
dummycfg
"
).
returncode
==
11
with
(
patch
.
object
(
rwm
.
RWM
,
"
restic_autoinit
"
,
mock_ok
),
patch
.
object
(
rwm
.
RWM
,
"
restic_backup
"
,
mock_fail
)
):
assert
RWM
(
rwm_conf
).
backup_cmd
(
"
dummycfg
"
).
returncode
==
11
with
(
patch
.
object
(
rwm
.
RWM
,
"
restic_autoinit
"
,
mock_ok
),
patch
.
object
(
rwm
.
RWM
,
"
restic_backup
"
,
mock_ok
),
patch
.
object
(
rwm
.
RWM
,
"
restic_forget_prune
"
,
mock_fail
)
):
assert
RWM
(
rwm_conf
).
backup_cmd
(
"
dummycfg
"
).
returncode
==
11
def
test_backup_all_cmd
(
tmpworkdir
:
str
):
# pylint: disable=unused-argument
"""
test backup command err cases
"""
rwm_conf
=
{
"
rwm_backups
"
:
{
"
dummycfg
"
:
{
"
filesdirs
"
:
[
"
dummydir
"
]}
}
}
mock
=
Mock
(
return_value
=
CompletedProcess
(
args
=
'
dummy
'
,
returncode
=
0
))
with
(
patch
.
object
(
rwm
.
RWM
,
"
restic_autoinit
"
,
mock
),
patch
.
object
(
rwm
.
RWM
,
"
restic_backup
"
,
mock
),
patch
.
object
(
rwm
.
RWM
,
"
restic_forget_prune
"
,
mock
)
):
assert
RWM
(
rwm_conf
).
backup_all_cmd
()
==
0
def
test_backup_all_cmd_error_handling
(
tmpworkdir
:
str
):
# pylint: disable=unused-argument
"""
test backup command err cases
"""
rwm_conf
=
{
"
rwm_backups
"
:
{
"
dummycfg
"
:
{
"
filesdirs
"
:
[
"
dummydir
"
]}
}
}
mock_fail
=
Mock
(
return_value
=
CompletedProcess
(
args
=
'
dummy
'
,
returncode
=
11
))
with
patch
.
object
(
rwm
.
RWM
,
"
restic_autoinit
"
,
mock_fail
):
assert
RWM
(
rwm_conf
).
backup_all_cmd
()
==
11
def
test_storage_create_cmd
(
tmpworkdir
:
str
,
microceph
:
str
,
radosuser_admin
:
rwm
.
StorageManager
):
# pylint: disable=unused-argument
"""
test_storage_create_cmd
"""
trwm
=
rwm
.
RWM
({
"
rwm_s3_endpoint_url
"
:
radosuser_admin
.
url
,
"
rwm_s3_access_key
"
:
radosuser_admin
.
access_key
,
"
rwm_s3_secret_key
"
:
radosuser_admin
.
secret_key
,
})
bucket_name
=
"
testbuck
"
assert
trwm
.
storage_create_cmd
(
bucket_name
,
"
testnx
"
)
==
0
assert
trwm
.
storage_create_cmd
(
"
!invalid
"
,
"
testnx
"
)
==
1
assert
trwm
.
storage_create_cmd
(
""
,
"
testnx
"
)
==
1
def
test_storage_delete_cmd
(
tmpworkdir
:
str
,
microceph
:
str
,
radosuser_admin
:
rwm
.
StorageManager
):
# pylint: disable=unused-argument
"""
test_storage_create_cmd
"""
trwm
=
rwm
.
RWM
({
"
rwm_s3_endpoint_url
"
:
radosuser_admin
.
url
,
"
rwm_s3_access_key
"
:
radosuser_admin
.
access_key
,
"
rwm_s3_secret_key
"
:
radosuser_admin
.
secret_key
,
"
rwm_restic_bucket
"
:
"
testbuck
"
,
"
rwm_restic_password
"
:
"
dummydummydummydummy
"
,
"
rwm_backups
"
:
{
"
testcfg
"
:
{
"
filesdirs
"
:
[
"
testdatadir/
"
]}
}
})
bucket_name
=
trwm
.
config
[
"
rwm_restic_bucket
"
]
Path
(
"
testdatadir
"
).
mkdir
()
Path
(
"
testdatadir/testdata1.txt
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
bucket
=
trwm
.
storage_manager
.
storage_create
(
bucket_name
,
"
admin
"
)
assert
trwm
.
storage_manager
.
bucket_exist
(
bucket_name
)
assert
len
(
trwm
.
storage_manager
.
list_objects
(
bucket_name
))
==
0
assert
trwm
.
backup_cmd
(
"
testcfg
"
).
returncode
==
0
assert
len
(
trwm
.
storage_manager
.
list_objects
(
bucket_name
))
!=
0
object_versions
=
radosuser_admin
.
s3
.
meta
.
client
.
list_object_versions
(
Bucket
=
bucket
.
name
)
assert
len
(
object_versions
[
"
Versions
"
])
>
0
assert
len
(
object_versions
[
"
DeleteMarkers
"
])
>
0
assert
trwm
.
storage_delete_cmd
(
bucket_name
)
==
0
assert
not
trwm
.
storage_manager
.
bucket_exist
(
bucket_name
)
assert
trwm
.
storage_delete_cmd
(
bucket_name
)
==
1
def
test_storage_check_policy_cmd
(
tmpworkdir
:
str
,
microceph
:
str
,
radosuser_admin
:
rwm
.
StorageManager
):
# pylint: disable=unused-argument
"""
test storage check policy command
"""
trwm
=
rwm
.
RWM
({
"
rwm_s3_endpoint_url
"
:
radosuser_admin
.
url
,
"
rwm_s3_access_key
"
:
radosuser_admin
.
access_key
,
"
rwm_s3_secret_key
"
:
radosuser_admin
.
secret_key
,
})
mock
=
Mock
(
return_value
=
False
)
with
patch
.
object
(
rwm
.
StorageManager
,
"
storage_check_policy
"
,
mock
):
assert
trwm
.
storage_check_policy_cmd
(
"
dummy
"
)
==
1
This diff is collapsed.
Click to expand it.
tests/test_rwm.py
0 → 100644
+
287
−
0
View file @
8b5f52e6
"""
rwm tests
"""
import
json
from
pathlib
import
Path
from
subprocess
import
CompletedProcess
from
unittest.mock
import
Mock
,
patch
import
rwm
def
test_aws_cmd
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test aws command
"""
trwm
=
rwm
.
RWM
({
"
rwm_s3_endpoint_url
"
:
motoserver
,
"
rwm_s3_access_key
"
:
"
dummy
"
,
"
rwm_s3_secret_key
"
:
"
dummy
"
,
})
test_bucket
=
"
testbucket
"
assert
not
trwm
.
storage_manager
.
bucket_exist
(
test_bucket
)
trwm
.
aws_cmd
([
"
s3
"
,
"
mb
"
,
f
"
s3://
{
test_bucket
}
"
])
assert
trwm
.
storage_manager
.
bucket_exist
(
test_bucket
)
trwm
.
aws_cmd
([
"
s3
"
,
"
rb
"
,
f
"
s3://
{
test_bucket
}
"
])
assert
not
trwm
.
storage_manager
.
bucket_exist
(
test_bucket
)
def
test_rclone_cmd
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test rclone command
"""
trwm
=
rwm
.
RWM
({
"
rwm_s3_endpoint_url
"
:
motoserver
,
"
rwm_s3_access_key
"
:
"
dummy
"
,
"
rwm_s3_secret_key
"
:
"
dummy
"
,
})
test_bucket
=
"
testbucket
"
test_file
=
"
testfile.txt
"
Path
(
test_file
).
write_text
(
'
1234
'
,
encoding
=
'
utf-8
'
)
trwm
.
rclone_cmd
([
"
mkdir
"
,
f
"
rwmbe:/
{
test_bucket
}
/
"
])
assert
trwm
.
storage_manager
.
bucket_exist
(
test_bucket
)
trwm
.
rclone_cmd
([
"
copy
"
,
test_file
,
f
"
rwmbe:/
{
test_bucket
}
/
"
])
assert
len
(
trwm
.
storage_manager
.
list_objects
(
test_bucket
))
==
1
def
test_rclone_crypt_cmd
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test rclone with crypt overlay
"""
trwm
=
rwm
.
RWM
({
"
rwm_s3_endpoint_url
"
:
motoserver
,
"
rwm_s3_access_key
"
:
"
dummy
"
,
"
rwm_s3_secret_key
"
:
"
dummy
"
,
"
rwm_rclone_crypt_bucket
"
:
"
cryptdata_test
"
,
"
rwm_rclone_crypt_password
"
:
rwm
.
rclone_obscure_password
(
"
dummydummydummydummy
"
),
})
test_bucket
=
"
testbucket
"
test_file
=
"
testfile.txt
"
Path
(
test_file
).
write_text
(
'
1234
'
,
encoding
=
'
utf-8
'
)
trwm
.
rclone_crypt_cmd
([
"
copy
"
,
test_file
,
f
"
rwmbe:/
{
test_bucket
}
/
"
])
assert
len
(
trwm
.
storage_manager
.
list_objects
(
trwm
.
config
[
"
rwm_rclone_crypt_bucket
"
]))
==
1
trwm
.
rclone_crypt_cmd
([
"
delete
"
,
f
"
rwmbe:/
{
test_bucket
}
/
{
test_file
}
"
])
assert
len
(
trwm
.
storage_manager
.
list_objects
(
trwm
.
config
[
"
rwm_rclone_crypt_bucket
"
]))
==
0
test_file1
=
"
testfile1.txt
"
Path
(
test_file1
).
write_text
(
'
4321
'
,
encoding
=
'
utf-8
'
)
trwm
.
rclone_crypt_cmd
([
"
sync
"
,
"
.
"
,
f
"
rwmbe:/
{
test_bucket
}
/
"
])
assert
len
(
trwm
.
storage_manager
.
list_objects
(
trwm
.
config
[
"
rwm_rclone_crypt_bucket
"
]))
==
2
Path
(
test_file1
).
unlink
()
trwm
.
rclone_crypt_cmd
([
"
sync
"
,
"
.
"
,
f
"
rwmbe:/
{
test_bucket
}
/
"
])
assert
len
(
trwm
.
storage_manager
.
list_objects
(
trwm
.
config
[
"
rwm_rclone_crypt_bucket
"
]))
==
1
def
test_restic_cmd
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test restic command
"""
trwm
=
rwm
.
RWM
({
"
rwm_s3_endpoint_url
"
:
motoserver
,
"
rwm_s3_access_key
"
:
"
dummy
"
,
"
rwm_s3_secret_key
"
:
"
dummy
"
,
"
rwm_restic_bucket
"
:
"
restictest
"
,
"
rwm_restic_password
"
:
"
dummydummydummydummy
"
,
})
assert
trwm
.
restic_cmd
([
"
init
"
]).
returncode
==
0
proc
=
trwm
.
restic_cmd
([
"
cat
"
,
"
config
"
])
assert
"
id
"
in
json
.
loads
(
proc
.
stdout
)
def
_restic_list_snapshots
(
trwm
):
"""
test helper
"""
return
json
.
loads
(
trwm
.
restic_cmd
([
"
snapshots
"
,
"
--json
"
]).
stdout
)
def
_restic_list_snapshot_files
(
trwm
,
snapshot_id
):
"""
test helper
"""
snapshot_ls
=
[
json
.
loads
(
x
)
for
x
in
trwm
.
restic_cmd
([
"
ls
"
,
snapshot_id
,
"
--json
"
]).
stdout
.
splitlines
()]
return
[
x
[
"
path
"
]
for
x
in
snapshot_ls
if
(
x
[
"
struct_type
"
]
==
"
node
"
)
and
(
x
[
"
type
"
]
==
"
file
"
)]
def
test_backup_cmd
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test backup_cmd command
"""
trwm
=
rwm
.
RWM
({
"
rwm_s3_endpoint_url
"
:
motoserver
,
"
rwm_s3_access_key
"
:
"
dummy
"
,
"
rwm_s3_secret_key
"
:
"
dummy
"
,
"
rwm_restic_bucket
"
:
"
restictest
"
,
"
rwm_restic_password
"
:
"
dummydummydummydummy
"
,
"
rwm_backups
"
:
{
"
testcfg
"
:
{
"
filesdirs
"
:
[
"
testdatadir/
"
],
"
excludes
"
:
[
"
testfile_to_be_ignored
"
],
"
extras
"
:
[
"
--tag
"
,
"
dummytag
"
],
}
},
"
rwm_retention
"
:
{
"
keep-daily
"
:
"
1
"
}
})
Path
(
"
testdatadir
"
).
mkdir
()
Path
(
"
testdatadir/testdata1.txt
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
Path
(
"
testdatadir/testfile_to_be_ignored
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
assert
trwm
.
restic_cmd
([
"
init
"
]).
returncode
==
0
assert
trwm
.
backup_cmd
(
"
testcfg
"
).
returncode
==
0
snapshots
=
_restic_list_snapshots
(
trwm
)
assert
len
(
snapshots
)
==
1
snapshot_files
=
_restic_list_snapshot_files
(
trwm
,
snapshots
[
0
][
"
id
"
])
assert
"
/testdatadir/testdata1.txt
"
in
snapshot_files
def
test_backup_cmd_excludes
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test backup command
"""
import
os
trwm
=
rwm
.
RWM
({
"
rwm_s3_endpoint_url
"
:
motoserver
,
"
rwm_s3_access_key
"
:
"
dummy
"
,
"
rwm_s3_secret_key
"
:
"
dummy
"
,
"
rwm_restic_bucket
"
:
"
restictest
"
,
"
rwm_restic_password
"
:
"
dummydummydummydummy
"
,
"
rwm_backups
"
:
{
"
testcfg
"
:
{
"
filesdirs
"
:
[
"
testdatadir/
"
],
"
excludes
"
:
[
"
testdatadir/proc/*
"
,
"
*.ignored
"
],
"
extras
"
:
[
"
--tag
"
,
"
dummytag
"
],
}
}
})
Path
(
"
testdatadir
"
).
mkdir
()
Path
(
"
testdatadir/etc
"
).
mkdir
()
Path
(
"
testdatadir/etc/config
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
Path
(
"
testdatadir/etc/config3.ignored
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
Path
(
"
testdatadir/etc/proc
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
Path
(
"
testdatadir/proc
"
).
mkdir
()
Path
(
"
testdatadir/proc/to_be_also_excluded
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
Path
(
"
testdatadir/var
"
).
mkdir
()
Path
(
"
testdatadir/var/proc
"
).
mkdir
()
Path
(
"
testdatadir/var/proc/data
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
assert
trwm
.
restic_cmd
([
"
init
"
]).
returncode
==
0
assert
trwm
.
backup_cmd
(
"
testcfg
"
).
returncode
==
0
snapshots
=
_restic_list_snapshots
(
trwm
)
assert
len
(
snapshots
)
==
1
snapshot_files
=
_restic_list_snapshot_files
(
trwm
,
snapshots
[
0
][
"
id
"
])
assert
"
/testdatadir/etc/config
"
in
snapshot_files
assert
"
/testdatadir/etc/config3.ignored
"
not
in
snapshot_files
assert
"
/testdatadir/etc/proc
"
in
snapshot_files
assert
"
/testdatadir/proc
"
not
in
snapshot_files
assert
"
/testdatadir/proc/to_be_also_excluded
"
not
in
snapshot_files
assert
"
/testdatadir/var/proc/data
"
in
snapshot_files
def
test_backup_cmd_error_handling
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
"""
test backup command err cases
"""
rwm_conf
=
{
"
rwm_backups
"
:
{
"
dummycfg
"
:
{
"
filesdirs
"
:
[
"
dummydir
"
]}
}
}
mock_ok
=
Mock
(
return_value
=
CompletedProcess
(
args
=
'
dummy
'
,
returncode
=
0
))
mock_fail
=
Mock
(
return_value
=
CompletedProcess
(
args
=
'
dummy
'
,
returncode
=
11
))
with
(
patch
.
object
(
rwm
.
RWM
,
"
_restic_backup
"
,
mock_fail
)
):
assert
rwm
.
RWM
(
rwm_conf
).
backup_cmd
(
"
dummycfg
"
).
returncode
==
11
with
(
patch
.
object
(
rwm
.
RWM
,
"
_restic_backup
"
,
mock_ok
),
patch
.
object
(
rwm
.
RWM
,
"
_restic_forget_prune
"
,
mock_fail
)
):
assert
rwm
.
RWM
(
rwm_conf
).
backup_cmd
(
"
dummycfg
"
).
returncode
==
11
def
test_backup_all_cmd
(
tmpworkdir
:
str
):
# pylint: disable=unused-argument
"""
test backup command err cases
"""
rwm_conf
=
{
"
rwm_backups
"
:
{
"
dummycfg
"
:
{
"
filesdirs
"
:
[
"
dummydir
"
]}
}
}
mock
=
Mock
(
return_value
=
CompletedProcess
(
args
=
'
dummy
'
,
returncode
=
0
))
with
(
patch
.
object
(
rwm
.
RWM
,
"
_restic_backup
"
,
mock
),
patch
.
object
(
rwm
.
RWM
,
"
_restic_forget_prune
"
,
mock
)
):
assert
rwm
.
RWM
(
rwm_conf
).
backup_all_cmd
()
==
0
def
test_storage_create_cmd
(
tmpworkdir
:
str
,
microceph
:
str
,
radosuser_admin
:
rwm
.
StorageManager
):
# pylint: disable=unused-argument
"""
test_storage_create_cmd
"""
trwm
=
rwm
.
RWM
({
"
rwm_s3_endpoint_url
"
:
radosuser_admin
.
url
,
"
rwm_s3_access_key
"
:
radosuser_admin
.
access_key
,
"
rwm_s3_secret_key
"
:
radosuser_admin
.
secret_key
,
})
bucket_name
=
"
testbuck
"
assert
trwm
.
storage_create_cmd
(
bucket_name
,
"
testnx
"
)
==
0
assert
trwm
.
storage_create_cmd
(
"
!invalid
"
,
"
testnx
"
)
==
1
assert
trwm
.
storage_create_cmd
(
bucket_name
,
""
)
==
1
def
test_storage_delete_cmd
(
tmpworkdir
:
str
,
microceph
:
str
,
radosuser_admin
:
rwm
.
StorageManager
):
# pylint: disable=unused-argument
"""
test_storage_create_cmd
"""
trwm
=
rwm
.
RWM
({
"
rwm_s3_endpoint_url
"
:
radosuser_admin
.
url
,
"
rwm_s3_access_key
"
:
radosuser_admin
.
access_key
,
"
rwm_s3_secret_key
"
:
radosuser_admin
.
secret_key
,
"
rwm_restic_bucket
"
:
"
testbuck
"
,
"
rwm_restic_password
"
:
"
dummydummydummydummy
"
,
"
rwm_backups
"
:
{
"
testcfg
"
:
{
"
filesdirs
"
:
[
"
testdatadir/
"
]}
}
})
bucket_name
=
trwm
.
config
[
"
rwm_restic_bucket
"
]
Path
(
"
testdatadir
"
).
mkdir
()
Path
(
"
testdatadir/testdata1.txt
"
).
write_text
(
"
dummydata
"
,
encoding
=
"
utf-8
"
)
bucket
=
trwm
.
storage_manager
.
storage_create
(
bucket_name
,
"
admin
"
)
assert
len
(
trwm
.
storage_manager
.
list_objects
(
bucket_name
))
==
0
assert
trwm
.
restic_cmd
([
"
init
"
]).
returncode
==
0
assert
trwm
.
backup_cmd
(
"
testcfg
"
).
returncode
==
0
assert
len
(
trwm
.
storage_manager
.
list_objects
(
bucket_name
))
!=
0
object_versions
=
radosuser_admin
.
s3
.
meta
.
client
.
list_object_versions
(
Bucket
=
bucket
.
name
)
assert
len
(
object_versions
[
"
Versions
"
])
>
0
assert
len
(
object_versions
[
"
DeleteMarkers
"
])
>
0
assert
trwm
.
storage_delete_cmd
(
bucket_name
)
==
0
assert
not
trwm
.
storage_manager
.
bucket_exist
(
bucket_name
)
assert
trwm
.
storage_delete_cmd
(
bucket_name
)
==
1
def
test_storage_check_policy_cmd
(
tmpworkdir
:
str
,
microceph
:
str
,
radosuser_admin
:
rwm
.
StorageManager
):
# pylint: disable=unused-argument
"""
test storage check policy command
"""
trwm
=
rwm
.
RWM
({
"
rwm_s3_endpoint_url
"
:
radosuser_admin
.
url
,
"
rwm_s3_access_key
"
:
radosuser_admin
.
access_key
,
"
rwm_s3_secret_key
"
:
radosuser_admin
.
secret_key
,
})
mock
=
Mock
(
return_value
=
False
)
with
patch
.
object
(
rwm
.
StorageManager
,
"
storage_check_policy
"
,
mock
):
assert
trwm
.
storage_check_policy_cmd
(
"
dummy
"
)
==
1
This diff is collapsed.
Click to expand it.
tests/test_
policies
.py
→
tests/test_
storage
.py
+
54
−
50
View file @
8b5f52e6
"""
rwm bucket polici
es
tests
"""
"""
rwm
storagemanager and
bucket polici
ng
tests
"""
import
json
import
json
from
io
import
BytesIO
from
io
import
BytesIO
...
@@ -19,63 +19,63 @@ def test_microceph_defaults(
...
@@ -19,63 +19,63 @@ def test_microceph_defaults(
bucket_name
=
"
testbuckx
"
bucket_name
=
"
testbuckx
"
# create bucket, check owner and default policy
# create bucket
assert
bucket_name
not
in
[
x
.
name
for
x
in
radosuser_test1
.
list_buckets
()]
assert
not
radosuser_test1
.
bucket_exist
(
bucket_name
)
radosuser_test1
.
create_bucket
(
bucket_name
)
radosuser_test1
.
bucket_create
(
bucket_name
)
assert
len
(
radosuser_test1
.
list_buckets
())
==
1
assert
radosuser_test1
.
list_objects
(
bucket_name
)
==
[]
assert
bucket_name
in
[
x
.
name
for
x
in
radosuser_test1
.
list_buckets
()]
# assert basic raw bucket behavior
assert
radosuser_test1
.
bucket_exist
(
bucket_name
)
assert
radosuser_test1
.
bucket_owner
(
bucket_name
).
endswith
(
"
$test1
"
)
assert
radosuser_test1
.
bucket_owner
(
bucket_name
).
endswith
(
"
$test1
"
)
assert
not
radosuser_test1
.
bucket_policy
(
bucket_name
)
assert
not
radosuser_test1
.
bucket_policy
(
bucket_name
)
# bucket must exist, but not be
not visible nor
accessible to others
# bucket must exist, but not be accessible to others
with
pytest
.
raises
(
radosuser_test2
.
s3
.
meta
.
client
.
exceptions
.
BucketAlreadyExists
):
with
pytest
.
raises
(
radosuser_test2
.
s3
.
meta
.
client
.
exceptions
.
BucketAlreadyExists
):
radosuser_test2
.
create_bucket
(
bucket_name
)
radosuser_test2
.
bucket_create
(
bucket_name
)
assert
bucket_name
not
in
[
x
.
name
for
x
in
radosuser_test2
.
list_buckets
()]
with
pytest
.
raises
(
radosuser_test2
.
s3
.
meta
.
client
.
exceptions
.
ClientError
,
match
=
r
"
AccessDenied
"
):
with
pytest
.
raises
(
radosuser_test2
.
s3
.
meta
.
client
.
exceptions
.
ClientError
,
match
=
r
"
AccessDenied
"
):
assert
radosuser_test2
.
list_objects
(
bucket_name
)
assert
radosuser_test2
.
list_objects
(
bucket_name
)
def
test_storage_
policy
(
def
test_storage_
create
(
tmpworkdir
:
str
,
tmpworkdir
:
str
,
microceph
:
str
,
microceph
:
str
,
radosuser_admin
:
rwm
.
StorageManager
,
radosuser_admin
:
rwm
.
StorageManager
,
radosuser_test1
:
rwm
.
StorageManager
,
radosuser_test1
:
rwm
.
StorageManager
,
radosuser_test2
:
rwm
.
StorageManager
radosuser_test2
:
rwm
.
StorageManager
):
# pylint: disable=unused-argument
):
# pylint: disable=unused-argument
"""
test manager
created bucket policy
"""
"""
test manager
storage_create
"""
bucket
=
radosuser_admin
.
storage_create
(
"
testbuckx
"
,
"
test1
"
)
bucket
=
radosuser_admin
.
storage_create
(
"
testbuckx
"
,
"
test1
"
)
assert
radosuser_admin
.
list_objects
(
bucket
.
name
)
==
[]
assert
radosuser_admin
.
list_objects
(
bucket
.
name
)
==
[]
assert
radosuser_test1
.
list_objects
(
bucket
.
name
)
==
[]
assert
radosuser_admin
.
storage_check_policy
(
bucket
.
name
)
assert
radosuser_admin
.
bucket_policy
(
bucket
.
name
)
assert
radosuser_test1
.
bucket_policy
(
bucket
.
name
)
assert
radosuser_test1
.
storage_check_policy
(
bucket
.
name
)
# storage must exist, but not be accessible to others
with
pytest
.
raises
(
radosuser_test2
.
s3
.
meta
.
client
.
exceptions
.
ClientError
,
match
=
r
"
AccessDenied
"
):
with
pytest
.
raises
(
radosuser_test2
.
s3
.
meta
.
client
.
exceptions
.
ClientError
,
match
=
r
"
AccessDenied
"
):
radosuser_test2
.
list_objects
(
bucket
.
name
)
radosuser_test2
.
list_objects
(
bucket
.
name
)
assert
bucket
.
Versioning
().
status
==
"
Enabled
"
def
test_storage_
versioning
(
def
test_storage_
delete
(
tmpworkdir
:
str
,
tmpworkdir
:
str
,
microceph
:
str
,
microceph
:
str
,
radosuser_admin
:
rwm
.
StorageManager
,
radosuser_admin
:
rwm
.
StorageManager
,
radosuser_test1
:
rwm
.
StorageManager
,
radosuser_test1
:
rwm
.
StorageManager
,
):
# pylint: disable=unused-argument
):
# pylint: disable=unused-argument
"""
test manager
created bucket policy
"""
"""
test manager
storage_delete
"""
bucket_name
=
"
testbuckx
"
bucket_name
=
"
testbuckx
"
target_username
=
"
test1
"
target_username
=
"
test1
"
bucket
=
radosuser_admin
.
storage_create
(
bucket_name
,
target_username
)
bucket
=
radosuser_admin
.
storage_create
(
bucket_name
,
target_username
)
assert
bucket
.
Versioning
().
status
==
"
Enabled
"
bucket
=
radosuser_test1
.
s3
.
Bucket
(
bucket_name
)
#
bucket
=
radosuser_test1
.
s3
.
Bucket
(
bucket
.
name
)
bucket
.
upload_fileobj
(
BytesIO
(
b
"
dummydata
"
),
"
dummykey
"
)
bucket
.
upload_fileobj
(
BytesIO
(
b
"
dummydata
"
),
"
dummykey
"
)
assert
len
(
radosuser_test1
.
list_objects
(
bucket
_
name
))
==
1
assert
len
(
radosuser_test1
.
list_objects
(
bucket
.
name
))
==
1
bucket
.
Object
(
"
dummykey
"
).
delete
()
bucket
.
Object
(
"
dummykey
"
).
delete
()
assert
len
(
radosuser_test1
.
list_objects
(
bucket
_
name
))
==
0
assert
len
(
radosuser_test1
.
list_objects
(
bucket
.
name
))
==
0
# there should be object and it's delete marker
# there should be object and it's delete marker
object_versions
=
list
(
bucket
.
object_versions
.
all
())
object_versions
=
list
(
bucket
.
object_versions
.
all
())
...
@@ -90,7 +90,29 @@ def test_storage_versioning(
...
@@ -90,7 +90,29 @@ def test_storage_versioning(
assert
len
(
object_versions
[
"
DeleteMarkers
"
])
==
1
assert
len
(
object_versions
[
"
DeleteMarkers
"
])
==
1
def
test_storage_backup
(
def
test_storage_check_policy
(
tmpworkdir
:
str
,
microceph
:
str
,
radosuser_admin
:
rwm
.
StorageManager
,
radosuser_test1
:
rwm
.
StorageManager
):
# pylint: disable=unused-argument
"""
test manager storage_check_policy
"""
bucket_name
=
"
rwmbackup-test1
"
target_username
=
"
test1
"
assert
radosuser_admin
.
bucket_create
(
bucket_name
)
assert
not
radosuser_admin
.
storage_check_policy
(
bucket_name
)
radosuser_admin
.
storage_delete
(
bucket_name
)
radosuser_admin
.
storage_create
(
bucket_name
,
target_username
)
assert
radosuser_test1
.
storage_check_policy
(
bucket_name
)
radosuser_admin
.
s3
.
Bucket
(
bucket_name
).
Versioning
().
suspend
()
assert
not
radosuser_test1
.
storage_check_policy
(
bucket_name
)
def
test_storage_backup_usage
(
tmpworkdir
:
str
,
tmpworkdir
:
str
,
microceph
:
str
,
microceph
:
str
,
radosuser_admin
:
rwm
.
StorageManager
,
radosuser_admin
:
rwm
.
StorageManager
,
...
@@ -115,29 +137,11 @@ def test_storage_backup(
...
@@ -115,29 +137,11 @@ def test_storage_backup(
"
dummy
"
:
{
"
filesdirs
"
:
[
"
testdir
"
]}
"
dummy
"
:
{
"
filesdirs
"
:
[
"
testdir
"
]}
}
}
})
})
assert
trwm
.
restic_cmd
([
"
init
"
]).
returncode
==
0
assert
trwm
.
backup_cmd
(
"
dummy
"
).
returncode
==
0
assert
trwm
.
backup_cmd
(
"
dummy
"
).
returncode
==
0
assert
radosuser_test1
.
list_objects
(
bucket_name
)
assert
radosuser_test1
.
list_objects
(
bucket_name
)
assert
len
(
json
.
loads
(
trwm
.
restic_cmd
([
"
snapshots
"
,
"
--json
"
]).
stdout
))
==
1
assert
len
(
json
.
loads
(
trwm
.
restic_cmd
([
"
snapshots
"
,
"
--json
"
]).
stdout
))
==
1
with
pytest
.
raises
(
radosuser_test1
.
s3
.
meta
.
client
.
exceptions
.
ClientError
,
match
=
r
"
AccessDenied
"
):
def
test_storage_check_policy
(
assert
radosuser_test1
.
storage_delete
(
bucket_name
)
tmpworkdir
:
str
,
microceph
:
str
,
radosuser_admin
:
rwm
.
StorageManager
,
radosuser_test1
:
rwm
.
StorageManager
):
# pylint: disable=unused-argument
"""
test backup to manager created bucket with policy
"""
bucket_name
=
"
rwmbackup-test1
"
target_username
=
"
test1
"
assert
radosuser_admin
.
create_bucket
(
bucket_name
)
assert
not
radosuser_admin
.
storage_check_policy
(
bucket_name
)
radosuser_admin
.
storage_delete
(
bucket_name
)
radosuser_admin
.
storage_create
(
bucket_name
,
"
test1
"
)
assert
radosuser_test1
.
storage_check_policy
(
bucket_name
)
radosuser_admin
.
s3
.
Bucket
(
bucket_name
).
Versioning
().
suspend
()
assert
not
radosuser_test1
.
storage_check_policy
(
bucket_name
)
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment