Skip to content

Commit

Permalink
tests: add topic_delete_unavailable_test
Browse files Browse the repository at this point in the history
This tests deleting a tiered storage topic while the
S3 backend is unavailable.

Related: #7433
  • Loading branch information
jcsp committed Nov 23, 2022
1 parent 455b456 commit 9f309c3
Showing 1 changed file with 97 additions and 29 deletions.
126 changes: 97 additions & 29 deletions tests/rptest/tests/topic_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from rptest.services.rpk_producer import RpkProducer
from rptest.services.metrics_check import MetricCheck
from rptest.services.redpanda import SISettings
from rptest.util import wait_for_segments_removal
from rptest.util import wait_for_segments_removal, firewall_blocked
from ducktape.mark import parametrize


Expand Down Expand Up @@ -87,30 +87,115 @@ def __init__(self, test_context):
num_brokers=test_context.cluster.available().size(),
si_settings=self.si_settings)

self._s3_port = self.si_settings.cloud_storage_api_endpoint_port

self.kafka_tools = KafkaCliTools(self.redpanda)

def _populate_topic(self):
def _populate_topic(self, topic_name):
"""
Get system into state where there is data in both local
and remote storage for the topic.
"""
# Set retention to 5MB
self.kafka_tools.alter_topic_config(
self.topic, {'retention.local.target.bytes': 5 * 1024 * 1024})
topic_name, {'retention.local.target.bytes': 5 * 1024 * 1024})

# Write out 10MB
self.kafka_tools.produce(self.topic,
self.kafka_tools.produce(topic_name,
record_size=4096,
num_records=2560)

# Wait for segments evicted from local storage
for i in range(0, 3):
wait_for_segments_removal(self.redpanda, self.topic, i, 5)
wait_for_segments_removal(self.redpanda, topic_name, i, 5)

# Confirm objects in remote storage
before_objects = self.s3_client.list_objects(
self.si_settings.cloud_storage_bucket)
assert sum(1 for _ in before_objects) > 0
objects = self.s3_client.list_objects(
self.si_settings.cloud_storage_bucket, topic=topic_name)
assert sum(1 for _ in objects) > 0

@cluster(
num_nodes=3,
log_allow_list=[
'exception while executing partition operation: {type: deletion'
])
def topic_delete_unavailable_test(self):
"""
Test deleting while the S3 backend is unavailable: we should see
that local deletion proceeds, and remote deletion eventually
gives up.
"""
self._populate_topic(self.topic)
objects_before = set(
self.redpanda.s3_client.list_objects(
self.si_settings.cloud_storage_bucket, topic=self.topic))
assert (len(objects_before) > 0)

with firewall_blocked(self.redpanda.nodes, self._s3_port):
self.kafka_tools.delete_topic(self.topic)

# From user's point of view, deletion succeeds
assert self.topic not in self.kafka_tools.list_topics()

# Local storage deletion should proceed even if remote can't
wait_until(lambda: topic_storage_purged(self.redpanda, self.topic),
timeout_sec=30,
backoff_sec=1)

# Erase timeout is hardcoded 60 seconds, wait long enough
# for it to give up.
time.sleep(90)

# Confirm our firewall block is really working, nothing was deleted
objects_after = set(
self.redpanda.s3_client.list_objects(
self.si_settings.cloud_storage_bucket))
assert len(objects_after) >= len(objects_before)

# Check that after the controller backend experiences errors trying
# to execute partition deletion, it is still happily able to execute
# other operations on unrelated topics, i.e. has not stalled applying.
next_topic = "next_topic"
self.kafka_tools.create_topic(
TopicSpec(name=next_topic,
partition_count=3,
cleanup_policy=TopicSpec.CLEANUP_DELETE))
self._populate_topic(next_topic)
after_keys = set([
o.Key for o in self.redpanda.s3_client.list_objects(
self.si_settings.cloud_storage_bucket, topic=next_topic)
])
assert len(after_keys) > 0

self.kafka_tools.delete_topic(next_topic)
wait_until(lambda: topic_storage_purged(self.redpanda, next_topic),
timeout_sec=30,
backoff_sec=1)

wait_until(lambda: self._topic_remote_deleted(next_topic),
timeout_sec=30,
backoff_sec=1)

# The controller gave up on deleting the original topic, objects
# are left behind in the object store. This condition can be updated
# if we ever implement a mechanism for automatically GCing objects after
# a drop in the object storage backend.
final_objects = set(
self.s3_client.list_objects(self.si_settings.cloud_storage_bucket,
topic=self.topic))
assert len(final_objects) >= len(objects_before)

def _topic_remote_deleted(self, topic_name: str):
"""Return true if all objects removed from cloud storage"""
after_objects = self.s3_client.list_objects(
self.si_settings.cloud_storage_bucket, topic=topic_name)
self.logger.debug(f"Objects after topic {topic_name} deletion:")
empty = True
for i in after_objects:
self.logger.debug(f" {i}")
empty = False

return empty

@cluster(num_nodes=3)
@parametrize(disable_delete=False)
Expand All @@ -122,7 +207,7 @@ def topic_delete_cloud_storage_test(self, disable_delete):
self.kafka_tools.alter_topic_config(
self.topic, {'redpanda.remote.delete': 'false'})

self._populate_topic()
self._populate_topic(self.topic)

objects_before = set(
self.redpanda.s3_client.list_objects(
Expand All @@ -136,18 +221,6 @@ def topic_delete_cloud_storage_test(self, disable_delete):
timeout_sec=30,
backoff_sec=1)

def remote_empty():
"""Return true if all objects removed from cloud storage"""
after_objects = self.s3_client.list_objects(
self.si_settings.cloud_storage_bucket)
self.logger.debug("Objects after topic deletion:")
empty = True
for i in after_objects:
self.logger.debug(f" {i}")
empty = False

return empty

if disable_delete:
# Unfortunately there is no alternative ot sleeping here:
# we need to confirm not only that objects aren't deleted
Expand All @@ -164,7 +237,9 @@ def remote_empty():
else:
# The counter-test that deletion _doesn't_ happen in read replicas
# is done as part of read_replica_e2e_test
wait_until(remote_empty, timeout_sec=30, backoff_sec=1)
wait_until(lambda: self.remote_empty(self.topic),
timeout_sec=30,
backoff_sec=1)

# TODO: include transactional data so that we verify that .txrange
# objects are deleted.
Expand All @@ -173,13 +248,6 @@ def remote_empty():
# catch the case where there are segments in S3 not reflected in the
# manifest.

# TODO: test making the S3 backend unavailable during the topic
# delete. The delete action should be acked, but internally
# redpanda should keep retrying the S3 part until it succeeds.
# - When we bring the S3 backend back it shoudl succeed
# - If we restart redpanda before bringing the S3 backend back
# it should also succeed.

@cluster(num_nodes=4)
def partition_movement_test(self):
"""
Expand Down

0 comments on commit 9f309c3

Please sign in to comment.