Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
R
rwm
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Radoslav Bodó
rwm
Commits
7efded47
Commit
7efded47
authored
1 year ago
by
Radoslav Bodó
Browse files
Options
Downloads
Patches
Plain Diff
rwm: fix postruns for backup and backup_all
parent
7318905b
No related branches found
No related tags found
No related merge requests found
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
rwm.py
+74
-80
74 additions, 80 deletions
rwm.py
tests/test_default.py
+1
-1
1 addition, 1 deletion
tests/test_default.py
tests/test_rwm.py
+75
-90
75 additions, 90 deletions
tests/test_rwm.py
with
150 additions
and
171 deletions
rwm.py
+
74
−
80
View file @
7efded47
...
@@ -103,6 +103,28 @@ class RwmJSONEncoder(json.JSONEncoder):
...
@@ -103,6 +103,28 @@ class RwmJSONEncoder(json.JSONEncoder):
return
super
().
default
(
o
)
# pragma: nocover ; no other type in processeda data
return
super
().
default
(
o
)
# pragma: nocover ; no other type in processeda data
@dataclass
class
BackupResult
:
"""
backup results data container
"""
name
:
str
returncode
:
int
time_start
:
datetime
time_end
:
datetime
def
to_dict
(
self
):
"""
dict serializer
"""
return
{
"
ident
"
:
"
RESULT
"
,
"
name
"
:
self
.
name
,
"
status
"
:
"
OK
"
if
self
.
returncode
==
0
else
"
ERROR
"
,
"
returncode
"
:
self
.
returncode
,
"
backup_start
"
:
self
.
time_start
.
isoformat
(),
"
backup_time
"
:
str
(
self
.
time_end
-
self
.
time_start
),
}
class
StorageManager
:
class
StorageManager
:
"""
s3 policed bucket manager
"""
"""
s3 policed bucket manager
"""
...
@@ -340,7 +362,7 @@ class StorageManager:
...
@@ -340,7 +362,7 @@ class StorageManager:
return
state
return
state
def
storage_save_state
(
self
,
bucket_name
):
def
storage_save_state
(
self
,
bucket_name
)
->
int
:
"""
save storage state into itself
"""
"""
save storage state into itself
"""
# explicit error handling here, it's used during backup process
# explicit error handling here, it's used during backup process
...
@@ -371,28 +393,6 @@ class StorageManager:
...
@@ -371,28 +393,6 @@ class StorageManager:
return
0
return
0
@dataclass
class
BackupResult
:
"""
backup results data container
"""
name
:
str
returncode
:
int
time_start
:
datetime
time_end
:
datetime
def
to_dict
(
self
):
"""
dict serializer
"""
return
{
"
ident
"
:
"
RESULT
"
,
"
name
"
:
self
.
name
,
"
status
"
:
"
OK
"
if
self
.
returncode
==
0
else
"
ERROR
"
,
"
returncode
"
:
self
.
returncode
,
"
backup_start
"
:
self
.
time_start
.
isoformat
(),
"
backup_time
"
:
str
(
self
.
time_end
-
self
.
time_start
),
}
class
RWM
:
class
RWM
:
"""
rwm impl
"""
"""
rwm impl
"""
...
@@ -433,34 +433,36 @@ class RWM:
...
@@ -433,34 +433,36 @@ class RWM:
}
}
return
run_command
([
"
restic
"
]
+
args
,
env
=
env
)
return
run_command
([
"
restic
"
]
+
args
,
env
=
env
)
def
_restic_backup
(
self
,
name
)
->
subprocess
.
CompletedProcess
:
def
_restic_backup
(
self
,
name
)
->
int
:
"""
runs restic backup by name
"""
"""
runs restic backup by name
"""
logger
.
info
(
f
"
run
restic_backup
{
name
}
"
)
logger
.
info
(
f
"
_
restic_backup
{
name
}
"
)
conf
=
self
.
config
.
backups
[
name
]
conf
=
self
.
config
.
backups
[
name
]
excludes
=
[]
excludes
=
[]
for
item
in
conf
.
excludes
:
for
item
in
conf
.
excludes
:
excludes
+=
[
"
--exclude
"
,
item
]
excludes
+=
[
"
--exclude
"
,
item
]
cmd_args
=
[
"
backup
"
]
+
conf
.
extras
+
excludes
+
conf
.
filesdirs
cmd_args
=
[
"
backup
"
]
+
conf
.
extras
+
excludes
+
conf
.
filesdirs
return
self
.
restic_cmd
(
cmd_args
)
wrap_output
(
backup_proc
:
=
self
.
restic_cmd
(
cmd_args
))
return
backup_proc
.
returncode
def
_restic_forget_prune
(
self
)
->
subprocess
.
CompletedProcess
:
def
_restic_forget_prune
(
self
)
->
int
:
"""
runs forget prune
"""
"""
runs forget prune
"""
logger
.
info
(
"
run
restic_forget_prune
"
)
logger
.
info
(
"
_
restic_forget_prune
"
)
keeps
=
[]
keeps
=
[]
for
key
,
val
in
self
.
config
.
retention
.
items
():
for
key
,
val
in
self
.
config
.
retention
.
items
():
keeps
+=
[
f
"
--
{
key
}
"
,
val
]
keeps
+=
[
f
"
--
{
key
}
"
,
val
]
cmd_args
=
[
"
forget
"
,
"
--prune
"
]
+
keeps
cmd_args
=
[
"
forget
"
,
"
--prune
"
]
+
keeps
return
self
.
restic_cmd
(
cmd_args
)
wrap_output
(
forget_proc
:
=
self
.
restic_cmd
(
cmd_args
))
return
forget_proc
.
returncode
def
_runparts
(
self
,
parts
_name
,
parts
:
list
)
->
int
:
def
_runparts
(
self
,
backup
_name
,
parts
_name
)
->
int
:
"""
run all commands in parts in shell
"""
"""
run all commands in parts in shell
"""
for
part
in
parts
:
for
part
in
getattr
(
self
.
config
.
backups
[
backup_name
],
parts_name
)
:
logger
.
info
(
"
rwm
_runparts
%s shell command, %s
"
,
parts_name
,
json
.
dumps
(
part
))
logger
.
info
(
f
"
_runparts
{
parts_name
}
command,
{
json
.
dumps
(
part
)
}
"
)
wrap_output
(
part_proc
:
=
run_command
(
part
,
shell
=
True
))
wrap_output
(
part_proc
:
=
run_command
(
part
,
shell
=
True
))
if
part_proc
.
returncode
!=
0
:
if
part_proc
.
returncode
!=
0
:
logger
.
error
(
"
rwm _runparts failed
"
)
logger
.
error
(
"
rwm _runparts failed
"
)
...
@@ -468,63 +470,44 @@ class RWM:
...
@@ -468,63 +470,44 @@ class RWM:
return
0
return
0
def
_prerun
(
self
,
name
)
->
int
:
def
_backup_one
(
self
,
name
)
->
int
:
"""
prerun runparts stub
"""
"""
perform backup
"""
return
self
.
_runparts
(
"
prerun
"
,
self
.
config
.
backups
[
name
].
prerun
)
def
_postrun
(
self
,
name
)
->
int
:
"""
postrun runparts stub
"""
return
self
.
_runparts
(
"
postrun
"
,
self
.
config
.
backups
[
name
].
prerun
)
def
backup
(
self
,
name
)
->
int
:
"""
backup command
"""
if
not
self
.
storage_manager
.
storage_check_policy
(
self
.
config
.
restic_bucket
):
logger
.
warning
(
"
used bucket does not have expected policy
"
)
if
self
.
_prerun
(
name
)
!=
0
:
logger
.
error
(
"
rwm _prerun failed
"
)
return
1
wrap_output
(
backup_proc
:
=
self
.
_restic_backup
(
name
))
if
backup_proc
.
returncode
!=
0
:
logger
.
error
(
"
rwm _restic_backup failed
"
)
return
1
if
self
.
_postrun
(
name
)
!=
0
:
logger
.
error
(
"
rwm _postrun failed
"
)
return
1
wrap_output
(
forget_proc
:
=
self
.
_restic_forget_prune
())
if
forget_proc
.
returncode
!=
0
:
logger
.
error
(
"
rwm _restic_forget_prune failed
"
)
return
1
if
self
.
storage_manager
.
storage_save_state
(
self
.
config
.
restic_bucket
)
!=
0
:
logger
.
error
(
"
rwm storage_save_state failed
"
)
return
1
logger
.
info
(
f
"
_backup_one
{
name
}
"
)
if
ret
:
=
self
.
_runparts
(
name
,
"
prerun
"
):
return
ret
if
ret
:
=
self
.
_restic_backup
(
name
):
return
ret
if
ret
:
=
self
.
_runparts
(
name
,
"
postrun
"
):
return
ret
return
0
return
0
def
backup
_all
(
self
)
->
int
:
def
backup
(
self
,
backup_selector
:
str
|
list
)
->
int
:
"""
backup
all
command
"""
"""
backup command
. perform selected backup or all configured backups
"""
stats
=
[]
stats
=
[]
ret
=
0
ret
=
0
selected_backups
=
backup_selector
if
isinstance
(
backup_selector
,
list
)
else
[
backup_selector
]
if
any
(
name
not
in
self
.
config
.
backups
for
name
in
selected_backups
):
logger
.
error
(
"
invalid backup selector
"
)
return
1
if
not
self
.
storage_manager
.
storage_check_policy
(
self
.
config
.
restic_bucket
):
logger
.
warning
(
"
used bucket does not have expected policy
"
)
for
name
in
sel
f
.
config
.
backups
:
for
name
in
sel
ected_
backups
:
time_start
=
datetime
.
now
()
time_start
=
datetime
.
now
()
wrap_output
(
backup_
proc
:
=
self
.
_
restic_
backup
(
name
)
)
backup_
ret
=
self
.
_backup
_one
(
name
)
time_end
=
datetime
.
now
()
time_end
=
datetime
.
now
()
ret
|=
backup_
proc
.
returncode
ret
|=
backup_
ret
stats
.
append
(
BackupResult
(
name
,
backup_
proc
.
returncode
,
time_start
,
time_end
))
stats
.
append
(
BackupResult
(
name
,
backup_
ret
,
time_start
,
time_end
))
if
ret
==
0
:
if
ret
==
0
:
time_start
=
datetime
.
now
()
time_start
=
datetime
.
now
()
wrap_output
(
forget_
proc
:
=
self
.
_restic_forget_prune
()
)
forget_
ret
=
self
.
_restic_forget_prune
()
time_end
=
datetime
.
now
()
time_end
=
datetime
.
now
()
ret
|=
forget_
proc
.
returncode
ret
|=
forget_
ret
stats
.
append
(
BackupResult
(
"
_forget_prune
"
,
forget_
proc
.
returncode
,
time_start
,
time_end
))
stats
.
append
(
BackupResult
(
"
_forget_prune
"
,
forget_
ret
,
time_start
,
time_end
))
time_start
=
datetime
.
now
()
time_start
=
datetime
.
now
()
save_state_ret
=
self
.
storage_manager
.
storage_save_state
(
self
.
config
.
restic_bucket
)
save_state_ret
=
self
.
storage_manager
.
storage_save_state
(
self
.
config
.
restic_bucket
)
...
@@ -532,10 +515,14 @@ class RWM:
...
@@ -532,10 +515,14 @@ class RWM:
ret
|=
save_state_ret
ret
|=
save_state_ret
stats
.
append
(
BackupResult
(
"
_storage_save_state
"
,
save_state_ret
,
time_start
,
time_end
))
stats
.
append
(
BackupResult
(
"
_storage_save_state
"
,
save_state_ret
,
time_start
,
time_end
))
logger
.
info
(
"
rwm
backup
_all
results
"
)
logger
.
info
(
"
backup results
"
)
print
(
tabulate
([
item
.
to_dict
()
for
item
in
stats
],
headers
=
"
keys
"
,
numalign
=
"
left
"
))
print
(
tabulate
([
item
.
to_dict
()
for
item
in
stats
],
headers
=
"
keys
"
,
numalign
=
"
left
"
))
return
ret
return
ret
def
backup_all
(
self
)
->
int
:
"""
backup all stub
"""
return
self
.
backup
(
list
(
self
.
config
.
backups
.
keys
()))
def
storage_create
(
self
,
bucket_name
,
target_username
)
->
int
:
def
storage_create
(
self
,
bucket_name
,
target_username
)
->
int
:
"""
storage create command
"""
"""
storage create command
"""
...
@@ -678,30 +665,37 @@ def main(argv=None): # pylint: disable=too-many-branches
...
@@ -678,30 +665,37 @@ def main(argv=None): # pylint: disable=too-many-branches
if
args
.
command
==
"
aws
"
:
if
args
.
command
==
"
aws
"
:
ret
=
wrap_output
(
rwmi
.
aws_cmd
(
args
.
cmd_args
))
ret
=
wrap_output
(
rwmi
.
aws_cmd
(
args
.
cmd_args
))
if
args
.
command
==
"
restic
"
:
if
args
.
command
==
"
restic
"
:
ret
=
wrap_output
(
rwmi
.
restic_cmd
(
args
.
cmd_args
))
ret
=
wrap_output
(
rwmi
.
restic_cmd
(
args
.
cmd_args
))
if
args
.
command
==
"
backup
"
:
if
args
.
command
==
"
backup
"
:
ret
=
rwmi
.
backup
(
args
.
name
)
ret
=
rwmi
.
backup
(
args
.
name
)
logger
.
info
(
"
rwm backup finished with %s (ret %d)
"
,
"
success
"
if
ret
==
0
else
"
errors
"
,
ret
)
logger
.
info
(
"
backup finished with %s (ret %d)
"
,
"
success
"
if
ret
==
0
else
"
errors
"
,
ret
)
if
args
.
command
==
"
backup-all
"
:
if
args
.
command
==
"
backup-all
"
:
ret
=
rwmi
.
backup_all
()
ret
=
rwmi
.
backup_all
()
logger
.
info
(
"
rwm
backup_all finished with %s (ret %d)
"
,
"
success
"
if
ret
==
0
else
"
errors
"
,
ret
)
logger
.
info
(
"
backup_all finished with %s (ret %d)
"
,
"
success
"
if
ret
==
0
else
"
errors
"
,
ret
)
if
args
.
command
==
"
storage-create
"
:
if
args
.
command
==
"
storage-create
"
:
ret
=
rwmi
.
storage_create
(
args
.
bucket_name
,
args
.
target_username
)
ret
=
rwmi
.
storage_create
(
args
.
bucket_name
,
args
.
target_username
)
if
args
.
command
==
"
storage-delete
"
:
if
args
.
command
==
"
storage-delete
"
:
ret
=
rwmi
.
storage_delete
(
args
.
bucket_name
)
ret
=
rwmi
.
storage_delete
(
args
.
bucket_name
)
if
args
.
command
==
"
storage-list
"
:
if
args
.
command
==
"
storage-list
"
:
ret
=
rwmi
.
storage_list
()
ret
=
rwmi
.
storage_list
()
if
args
.
command
==
"
storage-info
"
:
if
args
.
command
==
"
storage-info
"
:
ret
=
rwmi
.
storage_info
(
args
.
bucket_name
)
ret
=
rwmi
.
storage_info
(
args
.
bucket_name
)
if
args
.
command
==
"
storage-drop-versions
"
:
if
args
.
command
==
"
storage-drop-versions
"
:
ret
=
rwmi
.
storage_drop_versions
(
args
.
bucket_name
)
ret
=
rwmi
.
storage_drop_versions
(
args
.
bucket_name
)
if
args
.
command
==
"
storage-restore-state
"
:
if
args
.
command
==
"
storage-restore-state
"
:
ret
=
rwmi
.
storage_restore_state
(
args
.
source_bucket
,
args
.
target_bucket
,
args
.
state
)
ret
=
rwmi
.
storage_restore_state
(
args
.
source_bucket
,
args
.
target_bucket
,
args
.
state
)
logger
.
debug
(
"
rwm
finished with %s (ret %d)
"
,
"
success
"
if
ret
==
0
else
"
errors
"
,
ret
)
logger
.
debug
(
"
finished with %s (ret %d)
"
,
"
success
"
if
ret
==
0
else
"
errors
"
,
ret
)
return
ret
return
ret
...
...
This diff is collapsed.
Click to expand it.
tests/test_default.py
+
1
−
1
View file @
7efded47
...
@@ -50,7 +50,7 @@ def test_main():
...
@@ -50,7 +50,7 @@ def test_main():
with
patch
.
object
(
rwm
.
RWM
,
"
backup
"
,
mock_ok
):
with
patch
.
object
(
rwm
.
RWM
,
"
backup
"
,
mock_ok
):
assert
_rwm_minconfig
([
"
backup
"
,
"
dummy
"
])
==
0
assert
_rwm_minconfig
([
"
backup
"
,
"
dummy
"
])
==
0
with
patch
.
object
(
rwm
.
RWM
,
"
backup
_all
"
,
mock_ok
):
with
patch
.
object
(
rwm
.
RWM
,
"
backup
"
,
mock_ok
):
assert
_rwm_minconfig
([
"
backup-all
"
])
==
0
assert
_rwm_minconfig
([
"
backup-all
"
])
==
0
with
patch
.
object
(
rwm
.
RWM
,
"
storage_create
"
,
mock_ok
):
with
patch
.
object
(
rwm
.
RWM
,
"
storage_create
"
,
mock_ok
):
...
...
This diff is collapsed.
Click to expand it.
tests/test_rwm.py
+
75
−
90
View file @
7efded47
...
@@ -2,7 +2,6 @@
...
@@ -2,7 +2,6 @@
import
json
import
json
from
pathlib
import
Path
from
pathlib
import
Path
from
subprocess
import
CompletedProcess
from
unittest.mock
import
Mock
,
patch
from
unittest.mock
import
Mock
,
patch
import
pytest
import
pytest
...
@@ -45,27 +44,75 @@ def test_restic_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused
...
@@ -45,27 +44,75 @@ def test_restic_cmd(tmpworkdir: str, motoserver: str): # pylint: disable=unused
assert
"
id
"
in
json
.
loads
(
proc
.
stdout
)
assert
"
id
"
in
json
.
loads
(
proc
.
stdout
)
def
_restic_list_snapshots
(
trwm
):
def
test_runparts
(
tmpworkdir
:
str
):
# pylint: disable=unused-argument
"""
test helper
"""
"""
test runparts
"""
return
json
.
loads
(
trwm
.
restic_cmd
([
"
snapshots
"
,
"
--json
"
]).
stdout
)
trwm
=
rwm
.
RWM
({
"
s3_endpoint_url
"
:
"
http://dummy
"
,
"
s3_access_key
"
:
"
dummy
"
,
"
s3_secret_key
"
:
"
dummy
"
,
"
backups
"
:
{
"
testcfg
"
:
{
"
filesdirs
"
:
[
"
testdatadir/
"
],
"
prerun
"
:
[
"
false || true
"
],
"
postrun
"
:
[
"
true && false
"
]
}
}
})
def
_restic_list_snapshot_files
(
trwm
,
snapshot_id
):
assert
trwm
.
_runparts
(
"
testcfg
"
,
"
prerun
"
)
==
0
# pylint:l disable=protected-access
"""
test helper
"""
assert
trwm
.
_runparts
(
"
testcfg
"
,
"
postrun
"
)
==
1
# pylint:l disable=protected-access
snapshot_ls
=
[
json
.
loads
(
x
)
for
x
in
trwm
.
restic_cmd
([
"
ls
"
,
snapshot_id
,
"
--json
"
]).
stdout
.
splitlines
()]
return
[
x
[
"
path
"
]
for
x
in
snapshot_ls
if
(
x
[
"
struct_type
"
]
==
"
node
"
)
and
(
x
[
"
type
"
]
==
"
file
"
)]
def
test_
runparts
(
tmpworkdir
:
str
):
# pylint: disable=unused-argument
def
test_
backup_one
(
tmpworkdir
:
str
):
# pylint: disable=unused-argument
"""
test
runparts
"""
"""
test
backup one error handling
"""
trwm
=
rwm
.
RWM
({
trwm
=
rwm
.
RWM
({
"
s3_endpoint_url
"
:
"
http://dummy
"
,
"
s3_endpoint_url
"
:
"
http://dummy
"
,
"
s3_access_key
"
:
"
dummy
"
,
"
s3_access_key
"
:
"
dummy
"
,
"
s3_secret_key
"
:
"
dummy
"
,
"
s3_secret_key
"
:
"
dummy
"
,
"
restic_bucket
"
:
"
restictest
"
,
"
restic_password
"
:
"
dummydummydummydummy
"
,
"
backups
"
:
{
"
prefail
"
:
{
"
filesdirs
"
:
[],
"
prerun
"
:
[
"
exit 11
"
]
},
"
backupfail
"
:
{
"
filesdirs
"
:
[]
},
"
postfail
"
:
{
"
filesdirs
"
:
[],
"
postrun
"
:
[
"
exit 13
"
]
}
}
})
})
assert
trwm
.
_runparts
(
"
tests
"
,
[
"
false
"
])
==
1
# pylint: disable=protected-access
mock_ok
=
Mock
(
return_value
=
0
)
mock_fail
=
Mock
(
return_value
=
12
)
assert
trwm
.
_backup_one
(
"
prefail
"
)
==
11
# pylint:l disable=protected-access
with
(
patch
.
object
(
rwm
.
RWM
,
"
_restic_backup
"
,
mock_fail
),
):
assert
trwm
.
_backup_one
(
"
backupfail
"
)
==
12
# pylint:l disable=protected-access
with
(
patch
.
object
(
rwm
.
RWM
,
"
_restic_backup
"
,
mock_ok
),
):
assert
trwm
.
_backup_one
(
"
postfail
"
)
==
13
# pylint:l disable=protected-access
def
_restic_list_snapshots
(
trwm
):
"""
test helper
"""
return
json
.
loads
(
trwm
.
restic_cmd
([
"
snapshots
"
,
"
--json
"
]).
stdout
)
def
_restic_list_snapshot_files
(
trwm
,
snapshot_id
):
"""
test helper
"""
snapshot_ls
=
[
json
.
loads
(
x
)
for
x
in
trwm
.
restic_cmd
([
"
ls
"
,
snapshot_id
,
"
--json
"
]).
stdout
.
splitlines
()]
return
[
x
[
"
path
"
]
for
x
in
snapshot_ls
if
(
x
[
"
struct_type
"
]
==
"
node
"
)
and
(
x
[
"
type
"
]
==
"
file
"
)]
def
test_backup
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
def
test_backup
(
tmpworkdir
:
str
,
motoserver
:
str
):
# pylint: disable=unused-argument
...
@@ -145,36 +192,6 @@ def test_backup_excludes(tmpworkdir: str, motoserver: str): # pylint: disable=u
...
@@ -145,36 +192,6 @@ def test_backup_excludes(tmpworkdir: str, motoserver: str): # pylint: disable=u
assert
"
/testdatadir/var/proc/data
"
in
snapshot_files
assert
"
/testdatadir/var/proc/data
"
in
snapshot_files
def
test_backup_runparts
(
tmpworkdir
:
str
):
# pylint: disable=unused-argument
"""
test backup
"""
trwm
=
rwm
.
RWM
({
"
s3_endpoint_url
"
:
"
http://dummy
"
,
"
s3_access_key
"
:
"
dummy
"
,
"
s3_secret_key
"
:
"
dummy
"
,
"
restic_bucket
"
:
"
restictest
"
,
"
restic_password
"
:
"
dummydummydummydummy
"
,
"
backups
"
:
{
"
testcfg
"
:
{
"
filesdirs
"
:
[
"
testdatadir/
"
],
"
prerun
"
:
[
"
false || true
"
],
"
postrun
"
:
[
"
true || false
"
]
}
}
})
mock_ok
=
Mock
(
return_value
=
0
)
mock_proc_ok
=
Mock
(
return_value
=
CompletedProcess
(
args
=
'
dummy
'
,
returncode
=
0
))
with
(
patch
.
object
(
rwm
.
StorageManager
,
"
storage_check_policy
"
,
mock_ok
),
patch
.
object
(
rwm
.
RWM
,
"
_restic_backup
"
,
mock_proc_ok
),
patch
.
object
(
rwm
.
RWM
,
"
_restic_forget_prune
"
,
mock_proc_ok
),
patch
.
object
(
rwm
.
StorageManager
,
"
storage_save_state
"
,
mock_ok
)
):
assert
trwm
.
backup
(
"
testcfg
"
)
==
0
def
test_backup_error_handling
(
tmpworkdir
:
str
):
# pylint: disable=unused-argument
def
test_backup_error_handling
(
tmpworkdir
:
str
):
# pylint: disable=unused-argument
"""
test backup command err cases
"""
"""
test backup command err cases
"""
...
@@ -187,68 +204,36 @@ def test_backup_error_handling(tmpworkdir: str): # pylint: disable=unused-argum
...
@@ -187,68 +204,36 @@ def test_backup_error_handling(tmpworkdir: str): # pylint: disable=unused-argum
"
dummycfg
"
:
{
"
filesdirs
"
:
[
"
dummydir
"
]}
"
dummycfg
"
:
{
"
filesdirs
"
:
[
"
dummydir
"
]}
}
}
}
}
mock_proc_ok
=
Mock
(
return_value
=
CompletedProcess
(
args
=
'
dummy
'
,
returncode
=
0
))
mock_proc_fail
=
Mock
(
return_value
=
CompletedProcess
(
args
=
'
dummy
'
,
returncode
=
2
))
mock_false
=
Mock
(
return_value
=
False
)
mock_true
=
Mock
(
return_value
=
True
)
mock_ok
=
Mock
(
return_value
=
0
)
mock_ok
=
Mock
(
return_value
=
0
)
mock_fail
=
Mock
(
return_value
=
11
)
mock_fail
=
Mock
(
return_value
=
11
)
with
(
assert
rwm
.
RWM
(
rwm_conf
).
backup
(
"
invalidselector
"
)
==
1
patch
.
object
(
rwm
.
RWM
,
"
_prerun
"
,
mock_fail
)
):
assert
rwm
.
RWM
(
rwm_conf
).
backup
(
"
dummycfg
"
)
==
1
with
(
patch
.
object
(
rwm
.
RWM
,
"
_prerun
"
,
mock_ok
),
patch
.
object
(
rwm
.
RWM
,
"
_restic_backup
"
,
mock_proc_fail
)
):
assert
rwm
.
RWM
(
rwm_conf
).
backup
(
"
dummycfg
"
)
==
1
with
(
with
(
patch
.
object
(
rwm
.
RWM
,
"
_prerun
"
,
mock_
ok
),
patch
.
object
(
rwm
.
StorageManager
,
"
storage_check_policy
"
,
mock_
false
),
patch
.
object
(
rwm
.
RWM
,
"
_
restic_
backup
"
,
mock_
proc_ok
),
patch
.
object
(
rwm
.
RWM
,
"
_backup
_one
"
,
mock_
fail
),
patch
.
object
(
rwm
.
RWM
,
"
_restic_f
orge
t_prune
"
,
mock_proc_fail
)
patch
.
object
(
rwm
.
StorageManager
,
"
st
or
a
ge
_save_state
"
,
mock_ok
)
):
):
assert
rwm
.
RWM
(
rwm_conf
).
backup
(
"
dummycfg
"
)
==
1
assert
rwm
.
RWM
(
rwm_conf
).
backup
(
"
dummycfg
"
)
==
1
1
with
(
with
(
patch
.
object
(
rwm
.
RWM
,
"
_prerun
"
,
mock_
ok
),
patch
.
object
(
rwm
.
StorageManager
,
"
storage_check_policy
"
,
mock_
true
),
patch
.
object
(
rwm
.
RWM
,
"
_
restic_
backup
"
,
mock_
proc_
ok
),
patch
.
object
(
rwm
.
RWM
,
"
_backup
_one
"
,
mock_ok
),
patch
.
object
(
rwm
.
RWM
,
"
_restic_forget_prune
"
,
mock_
proc_ok
),
patch
.
object
(
rwm
.
RWM
,
"
_restic_forget_prune
"
,
mock_
fail
),
patch
.
object
(
rwm
.
RWM
,
"
_postrun
"
,
mock_
fail
)
patch
.
object
(
rwm
.
StorageManager
,
"
storage_save_state
"
,
mock_
ok
)
):
):
assert
rwm
.
RWM
(
rwm_conf
).
backup
(
"
dummycfg
"
)
==
1
assert
rwm
.
RWM
(
rwm_conf
).
backup
(
"
dummycfg
"
)
==
1
1
with
(
with
(
patch
.
object
(
rwm
.
RWM
,
"
_prerun
"
,
mock_ok
),
patch
.
object
(
rwm
.
StorageManager
,
"
storage_check_policy
"
,
mock_true
),
patch
.
object
(
rwm
.
RWM
,
"
_restic_backup
"
,
mock_proc_ok
),
patch
.
object
(
rwm
.
RWM
,
"
_backup_one
"
,
mock_ok
),
patch
.
object
(
rwm
.
RWM
,
"
_restic_forget_prune
"
,
mock_proc_ok
),
patch
.
object
(
rwm
.
RWM
,
"
_restic_forget_prune
"
,
mock_ok
),
patch
.
object
(
rwm
.
RWM
,
"
_postrun
"
,
mock_ok
),
patch
.
object
(
rwm
.
StorageManager
,
"
storage_save_state
"
,
mock_fail
)
patch
.
object
(
rwm
.
StorageManager
,
"
storage_save_state
"
,
mock_fail
)
):
):
assert
rwm
.
RWM
(
rwm_conf
).
backup
(
"
dummycfg
"
)
==
1
assert
rwm
.
RWM
(
rwm_conf
).
backup
(
"
dummycfg
"
)
==
11
def
test_backup_all
(
tmpworkdir
:
str
):
# pylint: disable=unused-argument
"""
test backup_all
"""
rwm_conf
=
{
"
s3_endpoint_url
"
:
"
http://dummy
"
,
"
s3_access_key
"
:
"
dummy
"
,
"
s3_secret_key
"
:
"
dummy
"
,
"
restic_bucket
"
:
"
restictest
"
,
"
backups
"
:
{
"
dummycfg
"
:
{
"
filesdirs
"
:
[
"
dummydir
"
]}
}
}
mock_proc_ok
=
Mock
(
return_value
=
CompletedProcess
(
args
=
'
dummy
'
,
returncode
=
0
))
mock_ok
=
Mock
(
return_value
=
0
)
with
(
patch
.
object
(
rwm
.
RWM
,
"
_restic_backup
"
,
mock_proc_ok
),
patch
.
object
(
rwm
.
RWM
,
"
_restic_forget_prune
"
,
mock_proc_ok
),
patch
.
object
(
rwm
.
StorageManager
,
"
storage_save_state
"
,
mock_ok
)
):
assert
rwm
.
RWM
(
rwm_conf
).
backup_all
()
==
0
def
test_storage_create
(
tmpworkdir
:
str
,
microceph
:
str
,
radosuser_admin
:
rwm
.
StorageManager
):
# pylint: disable=unused-argument
def
test_storage_create
(
tmpworkdir
:
str
,
microceph
:
str
,
radosuser_admin
:
rwm
.
StorageManager
):
# pylint: disable=unused-argument
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment