Skip to content
Snippets Groups Projects
Commit 8d000bb7 authored by Radoslav Bodó's avatar Radoslav Bodó
Browse files

rwm: use s3 client oriented api to bulk delete objects for storage-delete

parent e85168cd
No related branches found
No related tags found
No related merge requests found
......@@ -69,6 +69,13 @@ def size_fmt(num):
return f'{num:0.1f} YiB'
def batched_iterator(lst, n):
"""yield n-items from list iterator"""
for i in range(0, len(lst), n):
yield lst[i:i + n]
class BackupConfig(BaseModel):
"""Configuration for backup operations.
......@@ -191,11 +198,13 @@ class StorageManager:
"s3:GetBucketVersioning"
]
def __init__(self, url, access_key, secret_key):
def __init__(self, url, access_key, secret_key, bulk_size=1000):
self.url = url
self.access_key = access_key
self.secret_key = secret_key
self.bulk_size = bulk_size
self.s3 = boto3.resource('s3', endpoint_url=url, aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key)
self.s3client = self.s3.meta.client
def bucket_create(self, name):
"""aws s3 resource api stub"""
......@@ -274,34 +283,34 @@ class StorageManager:
"""storage delete"""
# delete all objects
paginator = self.s3.meta.client.get_paginator('list_objects')
paginator = self.s3client.get_paginator('list_objects')
objects = []
for page in paginator.paginate(Bucket=bucket_name):
for item in page.get("Contents", []):
objects.append((bucket_name, item["Key"]))
for item in objects:
self.s3.Object(*item).delete()
objects.append({"Key": item["Key"]})
for items in batched_iterator(objects, self.bulk_size):
self.s3client.delete_objects(Bucket=bucket_name, Delete={'Objects': items, 'Quiet': True})
paginator = self.s3.meta.client.get_paginator('list_object_versions')
paginator = self.s3client.get_paginator('list_object_versions')
# delete all object versions
objects = []
for page in paginator.paginate(Bucket=bucket_name):
for item in page.get("Versions", []):
objects.append((bucket_name, item["Key"], item["VersionId"]))
for item in objects:
self.s3.ObjectVersion(*item).delete()
objects.append({"Key": item["Key"], "VersionId": item["VersionId"]})
for items in batched_iterator(objects, self.bulk_size):
self.s3client.delete_objects(Bucket=bucket_name, Delete={'Objects': items, 'Quiet': True})
# delete all delete markers
objects = []
for page in paginator.paginate(Bucket=bucket_name):
for item in page.get("DeleteMarkers", []):
objects.append((bucket_name, item["Key"], item["VersionId"]))
for item in objects:
self.s3.ObjectVersion(*item).delete()
objects.append({"Key": item["Key"], "VersionId": item["VersionId"]})
for items in batched_iterator(objects, self.bulk_size):
self.s3client.delete_objects(Bucket=bucket_name, Delete={'Objects': items, 'Quiet': True})
# delete bucket
self.s3.Bucket(bucket_name).delete()
self.s3client.delete_bucket(Bucket=bucket_name)
return 0
......
......@@ -242,10 +242,10 @@ def test_storage_delete(tmpworkdir: str, radosuser_admin: rwm.StorageManager):
bucket = radosuser_admin.storage_create(bucket_name, "dummy")
bucket.upload_fileobj(BytesIO(b"dummydata0"), "dummykey")
for idx in range(803):
for idx in range(int(os.environ.get("ROUNDS", 803))):
bucket.Object("dummykey").delete()
bucket.upload_fileobj(BytesIO(f"dummydata{idx}".encode()), "dummykey")
bucket.upload_fileobj(BytesIO(f"dummydata{idx}".encode()), f"dummykey{idx}")
assert radosuser_admin.storage_delete(bucket.name) == 0
assert not radosuser_admin.bucket_exist(bucket.name)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment