Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: add topic_delete_unavailable_test, tweak tiered storage topic deletion order #7460

Merged
merged 4 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,14 @@ partition_manager::remove(const model::ntp& ntp, partition_removal_mode mode) {
.then([this, ntp] { _unmanage_watchers.notify(ntp, ntp.tp.partition); })
.then([partition] { return partition->stop(); })
.then([partition] { return partition->remove_persistent_state(); })
.then([this, ntp] { return _storage.log_mgr().remove(ntp); })
.then([partition, mode] {
if (mode == partition_removal_mode::global) {
return partition->remove_remote_persistent_state();
} else {
return ss::now();
}
})
.then([this, ntp] { return _storage.log_mgr().remove(ntp); })
.finally([partition] {}); // in the end remove partition
}

Expand Down
29 changes: 27 additions & 2 deletions tests/rptest/archival/s3_client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import boto3
import re
from botocore.config import Config
from botocore.exceptions import ClientError
from time import sleep
from functools import wraps
import time
import datetime
from typing import Iterator, NamedTuple, Union
from typing import Iterator, NamedTuple, Union, Optional


class SlowDown(Exception):
Expand Down Expand Up @@ -289,7 +290,24 @@ def _list_objects(self, bucket, token=None, limit=1000):
else:
raise

def list_objects(self, bucket) -> Iterator[S3ObjectMetadata]:
def _key_to_topic(self, key: str):
# Segment objects: <hash>/<ns>/<topic>/<partition>_<revision>/...
# Manifest objects: <hash>/meta/<ns>/<topic>/<partition>_<revision>/...
# Topic manifest objects: <hash>/meta/<ns>/<topic>/topic_manifest.json
m = re.search(".+/(.+)/(.+)/(\d+_\d+/|topic_manifest.json)", key)
if m is None:
return None
else:
return m.group(2)

def list_objects(
self,
bucket,
topic: Optional[str] = None) -> Iterator[S3ObjectMetadata]:
"""
:param bucket: S3 bucket name
:param topic: Optional, if set then only return objects belonging to this topic
"""
token = None
truncated = True
while truncated:
Expand All @@ -298,6 +316,13 @@ def list_objects(self, bucket) -> Iterator[S3ObjectMetadata]:
truncated = bool(res['IsTruncated'])
if 'Contents' in res:
for item in res['Contents']:

# Apply optional topic filtering
if topic is not None and self._key_to_topic(
item['Key']) != topic:
self.logger.debug(f"Skip {item['Key']} for {topic}")
continue

yield S3ObjectMetadata(Bucket=bucket,
Key=item['Key'],
ETag=item['ETag'][1:-1],
Expand Down
135 changes: 101 additions & 34 deletions tests/rptest/tests/topic_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from rptest.services.rpk_producer import RpkProducer
from rptest.services.metrics_check import MetricCheck
from rptest.services.redpanda import SISettings
from rptest.util import wait_for_segments_removal
from rptest.util import wait_for_segments_removal, firewall_blocked
from rptest.services.admin import Admin


Expand Down Expand Up @@ -171,7 +171,8 @@ def produce_until_partitions():


class TopicDeleteCloudStorageTest(RedpandaTest):
topics = (TopicSpec(partition_count=3,
partition_count = 3
topics = (TopicSpec(partition_count=partition_count,
cleanup_policy=TopicSpec.CLEANUP_DELETE), )

def __init__(self, test_context):
Expand All @@ -183,30 +184,113 @@ def __init__(self, test_context):
num_brokers=test_context.cluster.available().size(),
si_settings=self.si_settings)

self._s3_port = self.si_settings.cloud_storage_api_endpoint_port

self.kafka_tools = KafkaCliTools(self.redpanda)

def _populate_topic(self):
def _populate_topic(self, topic_name):
"""
Get system into state where there is data in both local
and remote storage for the topic.
"""
# Set retention to 5MB
self.kafka_tools.alter_topic_config(
self.topic, {'retention.local.target.bytes': 5 * 1024 * 1024})
topic_name, {'retention.local.target.bytes': 5 * 1024 * 1024})

# Write out 10MB
self.kafka_tools.produce(self.topic,
# Write out 10MB per partition
self.kafka_tools.produce(topic_name,
record_size=4096,
num_records=2560)
num_records=2560 * self.partition_count)

# Wait for segments evicted from local storage
for i in range(0, 3):
wait_for_segments_removal(self.redpanda, self.topic, i, 5)
for i in range(0, self.partition_count):
wait_for_segments_removal(self.redpanda, topic_name, i, 5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe wait_for_segments_removal should fail if the initial number of segments is less than the final desired count.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it would be nice, but I think it's going to be racy unless the test is very confident that the segment count cannot end up lower than the target. I have limited trust in most of the tests that assert segment counts as a proxy for data retention


# Confirm objects in remote storage
before_objects = self.s3_client.list_objects(
self.si_settings.cloud_storage_bucket)
assert sum(1 for _ in before_objects) > 0
objects = self.s3_client.list_objects(
self.si_settings.cloud_storage_bucket, topic=topic_name)
assert sum(1 for _ in objects) > 0

@cluster(
num_nodes=3,
log_allow_list=[
'exception while executing partition operation: {type: deletion'
])
def topic_delete_unavailable_test(self):
"""
Test deleting while the S3 backend is unavailable: we should see
that local deletion proceeds, and remote deletion eventually
gives up.
"""
self._populate_topic(self.topic)
objects_before = set(
self.redpanda.s3_client.list_objects(
self.si_settings.cloud_storage_bucket, topic=self.topic))
assert len(objects_before) > 0

with firewall_blocked(self.redpanda.nodes, self._s3_port):
self.kafka_tools.delete_topic(self.topic)

# From user's point of view, deletion succeeds
assert self.topic not in self.kafka_tools.list_topics()

# Local storage deletion should proceed even if remote can't
wait_until(lambda: topic_storage_purged(self.redpanda, self.topic),
timeout_sec=30,
backoff_sec=1)

# Erase timeout is hardcoded 60 seconds, wait long enough
# for it to give up.
time.sleep(90)

# Confirm our firewall block is really working, nothing was deleted
objects_after = set(
self.redpanda.s3_client.list_objects(
self.si_settings.cloud_storage_bucket))
assert len(objects_after) >= len(objects_before)

# Check that after the controller backend experiences errors trying
# to execute partition deletion, it is still happily able to execute
# other operations on unrelated topics, i.e. has not stalled applying.
next_topic = "next_topic"
self.kafka_tools.create_topic(
TopicSpec(name=next_topic,
partition_count=self.partition_count,
cleanup_policy=TopicSpec.CLEANUP_DELETE))
self._populate_topic(next_topic)
after_keys = set(o.Key for o in self.redpanda.s3_client.list_objects(
self.si_settings.cloud_storage_bucket, topic=next_topic))
assert len(after_keys) > 0

self.kafka_tools.delete_topic(next_topic)
wait_until(lambda: topic_storage_purged(self.redpanda, next_topic),
timeout_sec=30,
backoff_sec=1)

wait_until(lambda: self._topic_remote_deleted(next_topic),
timeout_sec=30,
backoff_sec=1)

# The controller gave up on deleting the original topic, objects
# are left behind in the object store. This condition can be updated
# if we ever implement a mechanism for automatically GCing objects after
# a drop in the object storage backend.
final_objects = set(
self.s3_client.list_objects(self.si_settings.cloud_storage_bucket,
topic=self.topic))
assert len(final_objects) >= len(objects_before)

def _topic_remote_deleted(self, topic_name: str):
"""Return true if all objects removed from cloud storage"""
after_objects = self.s3_client.list_objects(
self.si_settings.cloud_storage_bucket, topic=topic_name)
self.logger.debug(f"Objects after topic {topic_name} deletion:")
empty = True
for i in after_objects:
self.logger.debug(f" {i}")
empty = False

return empty

@cluster(num_nodes=3)
@parametrize(disable_delete=False)
Expand All @@ -218,7 +302,7 @@ def topic_delete_cloud_storage_test(self, disable_delete):
self.kafka_tools.alter_topic_config(
self.topic, {'redpanda.remote.delete': 'false'})

self._populate_topic()
self._populate_topic(self.topic)

objects_before = set(
self.redpanda.s3_client.list_objects(
Expand All @@ -232,18 +316,6 @@ def topic_delete_cloud_storage_test(self, disable_delete):
timeout_sec=30,
backoff_sec=1)

def remote_empty():
"""Return true if all objects removed from cloud storage"""
after_objects = self.s3_client.list_objects(
self.si_settings.cloud_storage_bucket)
self.logger.debug("Objects after topic deletion:")
empty = True
for i in after_objects:
self.logger.debug(f" {i}")
empty = False

return empty

if disable_delete:
# Unfortunately there is no alternative ot sleeping here:
# we need to confirm not only that objects aren't deleted
Expand All @@ -260,7 +332,9 @@ def remote_empty():
else:
# The counter-test that deletion _doesn't_ happen in read replicas
# is done as part of read_replica_e2e_test
wait_until(remote_empty, timeout_sec=30, backoff_sec=1)
wait_until(lambda: self._topic_remote_deleted(self.topic),
timeout_sec=30,
backoff_sec=1)

# TODO: include transactional data so that we verify that .txrange
# objects are deleted.
Expand All @@ -269,13 +343,6 @@ def remote_empty():
# catch the case where there are segments in S3 not reflected in the
# manifest.

# TODO: test making the S3 backend unavailable during the topic
# delete. The delete action should be acked, but internally
# redpanda should keep retrying the S3 part until it succeeds.
# - When we bring the S3 backend back it shoudl succeed
# - If we restart redpanda before bringing the S3 backend back
# it should also succeed.

@cluster(num_nodes=4)
def partition_movement_test(self):
"""
Expand All @@ -290,7 +357,7 @@ def partition_movement_test(self):

admin = Admin(self.redpanda)

self._populate_topic()
self._populate_topic(self.topic)

objects_before = set(o.Key
for o in self.redpanda.s3_client.list_objects(
Expand Down