Skip to content

Commit

Permalink
Merge pull request #10652 from vbotbuildovich/backport-fixes-to-v23.1…
Browse files Browse the repository at this point in the history
….x-727

[v23.1.x] Reproduce the OOM on fetching from many partitions
  • Loading branch information
dlex authored May 10, 2023
2 parents 8d0082c + a49eb85 commit 4059e26
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 6 deletions.
4 changes: 3 additions & 1 deletion tests/rptest/clients/kafka_cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def make_factory(version):

return list(map(make_factory, cls.VERSIONS))

def create_topic(self, spec):
def create_topic(self, spec: TopicSpec):
self._redpanda.logger.debug("Creating topic: %s", spec.name)
args = ["--create"]
args += ["--topic", spec.name]
Expand All @@ -78,6 +78,8 @@ def create_topic(self, spec):
args += ["--config", f"retention.bytes={spec.retention_bytes}"]
if spec.retention_ms:
args += ["--config", f"retention.ms={spec.retention_ms}"]
if spec.max_message_bytes:
args += ["--config", f"max.message.bytes={spec.max_message_bytes}"]
return self._run("kafka-topics.sh", args)

def create_topic_partitions(self, topic, partitions):
Expand Down
4 changes: 3 additions & 1 deletion tests/rptest/clients/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def __init__(self,
redpanda_remote_read=None,
redpanda_remote_write=None,
redpanda_remote_delete=None,
segment_ms=None):
segment_ms=None,
max_message_bytes=None):
self.name = name or f"topic-{self._random_topic_suffix()}"
self.partition_count = partition_count
self.replication_factor = replication_factor
Expand All @@ -76,6 +77,7 @@ def __init__(self,
self.redpanda_remote_write = redpanda_remote_write
self.redpanda_remote_delete = redpanda_remote_delete
self.segment_ms = segment_ms
self.max_message_bytes = max_message_bytes

def __str__(self):
return self.name
Expand Down
8 changes: 5 additions & 3 deletions tests/rptest/services/kaf_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self,
context,
redpanda,
topic,
num_records=1,
num_records=None,
offset_for_read="newest"):
super(KafConsumer, self).__init__(context, num_nodes=1)
self._redpanda = redpanda
Expand All @@ -33,8 +33,10 @@ def _worker(self, _, node):
self._stopping.clear()
try:
partition = None
cmd = "echo $$ ; kaf consume -b %s -f --offset %s %s" % (
self._redpanda.brokers(), self._offset_for_read, self._topic)
cmd = "echo $$ ; kaf consume -b %s %s --offset %s %s" % (
self._redpanda.brokers(), "--follow" if self._num_records is
None else f"--limit-messages {self._num_records}",
self._offset_for_read, self._topic)
for line in node.account.ssh_capture(cmd):
if self._pid is None:
self._pid = line.strip()
Expand Down
7 changes: 6 additions & 1 deletion tests/rptest/services/rpk_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ def __init__(self,
quiet: bool = False,
produce_timeout: Optional[int] = None,
*,
partition: Optional[int] = None):
partition: Optional[int] = None,
max_message_bytes: Optional[int] = None):
super(RpkProducer, self).__init__(context, num_nodes=1)
self._redpanda = redpanda
self._topic = topic
Expand All @@ -33,6 +34,7 @@ def __init__(self,
self._quiet = quiet
self._output_line_count = 0
self._partition = partition
self._max_message_bytes = max_message_bytes

if produce_timeout is None:
produce_timeout = 10
Expand Down Expand Up @@ -63,6 +65,9 @@ def _worker(self, _idx, node):
if self._partition is not None:
cmd += f" -p {self._partition}"

if self._max_message_bytes is not None:
cmd += f" --max-message-bytes {self._max_message_bytes}"

self._stopping.clear()
try:
for line in node.account.ssh_capture(
Expand Down
130 changes: 130 additions & 0 deletions tests/rptest/tests/memory_stress_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Copyright 2023 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from enum import Enum

from ducktape.mark import ok_to_fail
from rptest.clients.types import TopicSpec
from rptest.services.cluster import cluster
from rptest.services.kaf_consumer import KafConsumer
from rptest.services.redpanda import ResourceSettings
from rptest.services.rpk_consumer import RpkConsumer
from rptest.services.rpk_producer import RpkProducer
from rptest.services.verifiable_consumer import VerifiableConsumer
from rptest.tests.redpanda_test import RedpandaTest
from rptest.utils.mode_checks import skip_debug_mode


class Consumers(Enum):
RPK = 1
KAF = 2
VERIFIABLE = 3


class MemoryStressTest(RedpandaTest):
"""
Try various ways to reach out-of-memory condition in a broker via Kafka API
"""
def setUp(self):
# Override parent setUp so that we don't start redpanda until each test,
# enabling each test case to customize its ResourceSettings
pass

@ok_to_fail # until the fix is delivered
@cluster(num_nodes=5)
@skip_debug_mode
def test_fetch_with_many_partitions(self):
"""
Exhaust memory by consuming from too many partitions in a single Fetch
API request.
"""
# memory_mb does not work with debug redpanda build, therefore the test
# only makes sense with release redpanda, hence @skip_debug_mode
self.redpanda.set_resource_settings(
ResourceSettings(memory_mb=256, num_cpus=1))
self.redpanda.set_seed_servers(self.redpanda.nodes)
self.redpanda.start(omit_seeds_on_idx_one=False)
self.redpanda.set_cluster_config(
{"kafka_batch_max_bytes": 10 * 1024 * 1024})

# the maximum message size that does not make redpanda OOM with all
# the other params as they are is 64 MiB
msg_size = 1024 * 1024
partition_count = 400
self.topics = [
TopicSpec(partition_count=partition_count,
max_message_bytes=msg_size * 2)
]
self._create_initial_topics()

msg_count = partition_count
rpk_response_timeout = 10 + partition_count // 10 + msg_count * msg_size // (
150 * 500_000)
produce_timeout = msg_count * msg_size // 2184533
self.logger.info(
f"Starting producer. msg_size={msg_size}, msg_count={msg_count}, partiton_count={partition_count}, rpk_response_timeout={rpk_response_timeout}, produce_timeout={produce_timeout}"
)

producer = RpkProducer(self.test_context,
self.redpanda,
self.topic,
msg_size=msg_size,
msg_count=msg_count,
printable=True,
produce_timeout=rpk_response_timeout,
max_message_bytes=msg_size * 2)
producer.start()
producer.wait(produce_timeout)
producer.stop()
producer.free()

for consumer_type in Consumers:
self.logger.info(f"Starting consumer {consumer_type.name}")

consumers = []
for k in range(2):
if consumer_type == Consumers.RPK:
consumers.append(
RpkConsumer(self.test_context,
self.redpanda,
self.topic,
fetch_max_bytes=1024 * 1024 * 3,
num_msgs=msg_count))
elif consumer_type == Consumers.KAF:
consumers.append(
KafConsumer(self.test_context,
self.redpanda,
self.topic,
offset_for_read="oldest",
num_records=msg_count))
elif consumer_type == Consumers.VERIFIABLE:
if k == 0: # more than one are not stable enough
consumers.append(
VerifiableConsumer(self.test_context,
1,
self.redpanda,
self.topic,
"verifiable-group",
max_messages=msg_count))
else:
assert False, "unsupported consumer type"
for consumer in consumers:
consumer.start()
for consumer in consumers:
consumer.wait()
for consumer in consumers:
consumer.stop()
for consumer in consumers:
consumer.free()

# TBD: for faster detection of redpanda crash, have a check in all
# redpanda nodes that the process is still alive and abort the test
# if it is not

# TBD: add librdkafa based consumer

0 comments on commit 4059e26

Please sign in to comment.