From 2df763e33ec3ecf5d72f053183515169a3779834 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Thu, 20 Jun 2024 17:10:12 +0100 Subject: [PATCH 1/2] kafka: fix timequery failing for empty topics This bug was introduced accidentally while trying to fix another off-by-one bug in commit ref 8f2de964c0e915f4f10ae8eb74400e6288c5680f. I forgot to account that for an empty topic the "start offset" is equal to "last offset" so calling `model::prev_offset` would result in an offset below the start which is invalid and throws an exception if passed downstream. To avoid the problem, we short-circuit with a "offset not found" response straight away if that's the case. https://github.com/redpanda-data/redpanda/pull/18112/commits/8f2de964c0e915f4f10ae8eb74400e6288c5680f (cherry picked from commit 1a080ee136d69764457cf8e5db1a80f32894050c) --- src/v/kafka/server/handlers/list_offsets.cc | 17 +++++++++++++++-- tests/rptest/tests/timequery_test.py | 19 ++++++++++++++++++- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/src/v/kafka/server/handlers/list_offsets.cc b/src/v/kafka/server/handlers/list_offsets.cc index 54c5a52cd1db..b301e60a928a 100644 --- a/src/v/kafka/server/handlers/list_offsets.cc +++ b/src/v/kafka/server/handlers/list_offsets.cc @@ -20,6 +20,7 @@ #include "kafka/server/replicated_partition.h" #include "kafka/server/request_context.h" #include "kafka/server/response.h" +#include "model/fundamental.h" #include "model/namespace.h" #include "resource_mgmt/io_priority.h" @@ -129,10 +130,22 @@ static ss::future list_offsets_partition( offset, kafka_partition->leader_epoch()); } + auto min_offset = kafka_partition->start_offset(); + auto max_offset = model::prev_offset(offset); + + // Empty partition. + if (max_offset < min_offset) { + co_return list_offsets_response::make_partition( + ktp.get_partition(), + model::timestamp(-1), + model::offset(-1), + kafka_partition->leader_epoch()); + } + auto res = co_await kafka_partition->timequery(storage::timequery_config{ - kafka_partition->start_offset(), + min_offset, timestamp, - model::prev_offset(offset), + max_offset, kafka_read_priority(), {model::record_batch_type::raft_data}, octx.rctx.abort_source().local()}); diff --git a/tests/rptest/tests/timequery_test.py b/tests/rptest/tests/timequery_test.py index c7e8c9297536..abc24c401ffb 100644 --- a/tests/rptest/tests/timequery_test.py +++ b/tests/rptest/tests/timequery_test.py @@ -103,6 +103,18 @@ def _test_timequery(self, cluster, cloud_storage: bool, batch_cache: bool): base_ts = 1664453149000 msg_count = (self.log_segment_size * total_segments) // record_size local_retention = self.log_segment_size * 4 + kcat = KafkaCat(cluster) + + # Test the base case with an empty topic. + empty_topic = TopicSpec(name="tq_empty_topic", + partition_count=1, + replication_factor=3) + self.client().create_topic(empty_topic) + offset = kcat.query_offset(empty_topic.name, 0, base_ts) + self.logger.info(f"Time query returned offset {offset}") + assert offset == -1, f"Expected -1, got {offset}" + + # Create a topic and produce a run of messages we will query. topic, timestamps = self._create_and_produce(cluster, cloud_storage, local_retention, base_ts, record_size, msg_count) @@ -163,7 +175,6 @@ def __init__(self, offset, ts=None, expect_read=True): # offset should cause cloud downloads. hit_offsets = set() - kcat = KafkaCat(cluster) cloud_metrics = None local_metrics = None @@ -515,6 +526,12 @@ def test_timequery_with_trim_prefix(self, cloud_storage: bool, offset = kcat.query_offset(topic.name, 0, timestamps[0] - 1000) assert offset == msg_count - 1, f"Expected {msg_count - 1}, got {offset}" + # Trim everything, leaving an empty log. + rpk.trim_prefix(topic.name, offset=p.high_watermark, partitions=[0]) + kcat = KafkaCat(self.redpanda) + offset = kcat.query_offset(topic.name, 0, timestamps[0] - 1000) + assert offset == -1, f"Expected -1, got {offset}" + @cluster( num_nodes=4, log_allow_list=["Failed to upload spillover manifest {timed_out}"]) From 1af52bb3782204c6e21a5b661efa41e29423e448 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Fri, 21 Jun 2024 15:05:53 +0100 Subject: [PATCH 2/2] rptest: enable additional timequery tests Forgot to re-enable after fixing a bunch of timequery bugs. (cherry picked from commit e7c0ddd27a69f163000e7eeaa5de9440139d8b61) --- tests/rptest/tests/timequery_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/rptest/tests/timequery_test.py b/tests/rptest/tests/timequery_test.py index abc24c401ffb..3f8cfe71f45c 100644 --- a/tests/rptest/tests/timequery_test.py +++ b/tests/rptest/tests/timequery_test.py @@ -467,8 +467,8 @@ def query_slices(tid): assert not any([e > 0 for e in errors]) @cluster(num_nodes=4) - # @parametrize(cloud_storage=True, spillover=False) - # @parametrize(cloud_storage=True, spillover=True) + @parametrize(cloud_storage=True, spillover=False) + @parametrize(cloud_storage=True, spillover=True) @parametrize(cloud_storage=False, spillover=False) def test_timequery_with_trim_prefix(self, cloud_storage: bool, spillover: bool):