Skip to content

Commit

Permalink
Updated Magnus' Add 'fetch.queue.backoff.ms' to the consumer (#2879) …
Browse files Browse the repository at this point in the history
…patch (#4284)

this property allows to trade off cpu for memory by reducing fetch backoff,
when values of `queued.max.messages.kbytes` and `queued.min.messages`
have to be set too high to hold 1s of data.

---------

Co-authored-by: Chris A. <cma@bitemyapp.com>
  • Loading branch information
emasab and bitemyapp committed Jun 15, 2023
1 parent 1d6fda8 commit e52aa3b
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 16 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,19 @@ librdkafka v2.2.0 is a feature release:
(#4301, started by @vctoriawu).
* Avoid treating an OpenSSL error as a permanent error and treat unclean SSL
closes as normal ones (#4294).
* Added `fetch.queue.backoff.ms` to the consumer to control how long
the consumer backs off next fetch attempt. (@bitemyapp, @edenhill, #2879)


## Enhancements

* Added `fetch.queue.backoff.ms` to the consumer to control how long
the consumer backs off next fetch attempt. When the pre-fetch queue
has exceeded its queuing thresholds: `queued.min.messages` and
`queued.max.messages.kbytes` it backs off for 1 seconds.
If those parameters have to be set too high to hold 1 s of data,
this new parameter allows to back off the fetch earlier, reducing memory
requirements.


## Fixes
Expand Down Expand Up @@ -52,6 +65,7 @@ librdkafka v2.2.0 is a feature release:
assignment completely.



# librdkafka v2.1.1

librdkafka v2.1.1 is a maintenance release:
Expand Down Expand Up @@ -91,6 +105,7 @@ librdkafka v2.1.1 is a maintenance release:
in a queue itself, that, whether polling, resets the timer.



# librdkafka v2.1.0

librdkafka v2.1.0 is a feature release:
Expand Down
1 change: 1 addition & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ enable.auto.offset.store | C | true, false | true
queued.min.messages | C | 1 .. 10000000 | 100000 | medium | Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue. <br>*Type: integer*
queued.max.messages.kbytes | C | 1 .. 2097151 | 65536 | medium | Maximum number of kilobytes of queued pre-fetched messages in the local consumer queue. If using the high-level consumer this setting applies to the single consumer queue, regardless of the number of partitions. When using the legacy simple consumer or when separate partition queues are used this setting applies per partition. This value may be overshot by fetch.message.max.bytes. This property has higher priority than queued.min.messages. <br>*Type: integer*
fetch.wait.max.ms | C | 0 .. 300000 | 500 | low | Maximum time the broker may wait to fill the Fetch response with fetch.min.bytes of messages. <br>*Type: integer*
fetch.queue.backoff.ms | C | 0 .. 300000 | 1000 | medium | How long to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds (queued.min.messages or queued.max.messages.kbytes) have been exceded. This property may need to be decreased if the queue thresholds are set low and the application is experiencing long (~1s) delays between messages. Low values may increase CPU utilization. <br>*Type: integer*
fetch.message.max.bytes | C | 1 .. 1000000000 | 1048576 | medium | Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched. <br>*Type: integer*
max.partition.fetch.bytes | C | 1 .. 1000000000 | 1048576 | medium | Alias for `fetch.message.max.bytes`: Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched. <br>*Type: integer*
fetch.max.bytes | C | 0 .. 2147483135 | 52428800 | medium | Maximum amount of data the broker shall return for a Fetch request. Messages are fetched in batches by the consumer and if the first message batch in the first non-empty partition of the Fetch request is larger than this value, then the message batch will still be returned to ensure the consumer can make progress. The maximum message batch size accepted by the broker is defined via `message.max.bytes` (broker config) or `max.message.bytes` (broker topic config). `fetch.max.bytes` is automatically adjusted upwards to be at least `message.max.bytes` (consumer config). <br>*Type: integer*
Expand Down
10 changes: 10 additions & 0 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,16 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"Maximum time the broker may wait to fill the Fetch response "
"with fetch.min.bytes of messages.",
0, 300 * 1000, 500},
{_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "fetch.queue.backoff.ms", _RK_C_INT,
_RK(fetch_queue_backoff_ms),
"How long to postpone the next fetch request for a "
"topic+partition in case the current fetch queue thresholds "
"(queued.min.messages or queued.max.messages.kbytes) have "
"been exceded. "
"This property may need to be decreased if the queue thresholds are "
"set low and the application is experiencing long (~1s) delays "
"between messages. Low values may increase CPU utilization.",
0, 300 * 1000, 1000},
{_RK_GLOBAL | _RK_CONSUMER | _RK_MED, "fetch.message.max.bytes", _RK_C_INT,
_RK(fetch_msg_max_bytes),
"Initial maximum number of bytes per topic+partition to request when "
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ struct rd_kafka_conf_s {
int fetch_msg_max_bytes;
int fetch_max_bytes;
int fetch_min_bytes;
int fetch_queue_backoff_ms;
int fetch_error_backoff_ms;
char *group_id_str;
char *group_instance_id;
Expand Down
49 changes: 33 additions & 16 deletions src/rdkafka_fetcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,29 @@ static void rd_kafka_broker_fetch_backoff(rd_kafka_broker_t *rkb,

/**
* @brief Backoff the next Fetch for specific partition
*
* @returns the absolute backoff time (the current time for no backoff).
*/
static void rd_kafka_toppar_fetch_backoff(rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp,
rd_kafka_resp_err_t err) {
int backoff_ms = rkb->rkb_rk->rk_conf.fetch_error_backoff_ms;
static rd_ts_t rd_kafka_toppar_fetch_backoff(rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp,
rd_kafka_resp_err_t err) {
int backoff_ms;

/* Don't back off on reaching end of partition */
if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF)
return;
if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
rktp->rktp_ts_fetch_backoff = 0;
return rd_clock(); /* Immediate: No practical backoff */
}

if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL)
backoff_ms = rkb->rkb_rk->rk_conf.fetch_queue_backoff_ms;
else
backoff_ms = rkb->rkb_rk->rk_conf.fetch_error_backoff_ms;

if (unlikely(!backoff_ms)) {
rktp->rktp_ts_fetch_backoff = 0;
return rd_clock(); /* Immediate: No practical backoff */
}

/* Certain errors that may require manual intervention should have
* a longer backoff time. */
Expand All @@ -73,8 +87,9 @@ static void rd_kafka_toppar_fetch_backoff(rd_kafka_broker_t *rkb,
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
backoff_ms, err ? ": " : "",
err ? rd_kafka_err2str(err) : "");
}

return rktp->rktp_ts_fetch_backoff;
}

/**
* @brief Handle preferred replica in fetch response.
Expand Down Expand Up @@ -1021,7 +1036,7 @@ rd_ts_t rd_kafka_toppar_fetch_decide(rd_kafka_toppar_t *rktp,
rd_interval(&rktp->rktp_lease_intvl,
5 * 60 * 1000 * 1000 /*5 minutes*/, 0) > 0;
if (lease_expired) {
/* delete_to_leader() requires no locks to be held */
/* delegate_to_leader() requires no locks to be held */
rd_kafka_toppar_unlock(rktp);
rd_kafka_toppar_delegate_to_leader(rktp);
rd_kafka_toppar_lock(rktp);
Expand Down Expand Up @@ -1097,22 +1112,24 @@ rd_ts_t rd_kafka_toppar_fetch_decide(rd_kafka_toppar_t *rktp,
rktp->rktp_next_fetch_start.offset)) {
should_fetch = 0;
reason = "no concrete offset";

} else if (rktp->rktp_ts_fetch_backoff > rd_clock()) {
reason = "fetch backed off";
ts_backoff = rktp->rktp_ts_fetch_backoff;
should_fetch = 0;
} else if (rd_kafka_q_len(rktp->rktp_fetchq) >=
rkb->rkb_rk->rk_conf.queued_min_msgs) {
/* Skip toppars who's local message queue is already above
* the lower threshold. */
reason = "queued.min.messages exceeded";
reason = "queued.min.messages exceeded";
ts_backoff = rd_kafka_toppar_fetch_backoff(
rkb, rktp, RD_KAFKA_RESP_ERR__QUEUE_FULL);
should_fetch = 0;

} else if ((int64_t)rd_kafka_q_size(rktp->rktp_fetchq) >=
rkb->rkb_rk->rk_conf.queued_max_msg_bytes) {
reason = "queued.max.messages.kbytes exceeded";
should_fetch = 0;

} else if (rktp->rktp_ts_fetch_backoff > rd_clock()) {
reason = "fetch backed off";
ts_backoff = rktp->rktp_ts_fetch_backoff;
reason = "queued.max.messages.kbytes exceeded";
ts_backoff = rd_kafka_toppar_fetch_backoff(
rkb, rktp, RD_KAFKA_RESP_ERR__QUEUE_FULL);
should_fetch = 0;
}

Expand Down
165 changes: 165 additions & 0 deletions tests/0127-fetch_queue_backoff.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2020, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include <iostream>
#include <map>
#include <cstring>
#include <cstdlib>
#include "testcpp.h"
extern "C" {
#include "test.h"
}

/**
* Test consumer fetch.queue.backoff.ms behaviour.
*
* @param backoff_ms Backoff ms to configure, -1 to rely on default one.
*
* 1. Produce N messages, 1 message per batch.
* 2. Configure consumer with queued.min.messages=1 and
* fetch.queue.backoff.ms=<backoff_ms>
* 3. Verify that the consume() latency is <= fetch.queue.backoff.ms.
*/


static void do_test_queue_backoff(const std::string &topic, int backoff_ms) {
SUB_TEST("backoff_ms = %d", backoff_ms);

/* Create consumer */
RdKafka::Conf *conf;
Test::conf_init(&conf, NULL, 60);
Test::conf_set(conf, "group.id", topic);
Test::conf_set(conf, "enable.auto.commit", "false");
Test::conf_set(conf, "auto.offset.reset", "beginning");
Test::conf_set(conf, "queued.min.messages", "1");
if (backoff_ms >= 0) {
Test::conf_set(conf, "fetch.queue.backoff.ms", tostr() << backoff_ms);
}
/* Make sure to include only one message in each fetch.
* Message size is 10000. */
Test::conf_set(conf, "fetch.message.max.bytes", "12000");

if (backoff_ms < 0)
/* default */
backoff_ms = 1000;

std::string errstr;

RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
if (!c)
Test::Fail("Failed to create KafkaConsumer: " + errstr);
delete conf;

RdKafka::TopicPartition *rktpar = RdKafka::TopicPartition::create(topic, 0);
std::vector<RdKafka::TopicPartition *> parts;
parts.push_back(rktpar);

RdKafka::ErrorCode err;
if ((err = c->assign(parts)))
Test::Fail("assigned failed: " + RdKafka::err2str(err));
RdKafka::TopicPartition::destroy(parts);

int received = 0;
int in_profile_cnt = 0;
int dmax =
(int)((double)backoff_ms * (test_timeout_multiplier > 1 ? 1.5 : 1.2));
if (backoff_ms < 15)
dmax = 15;

int64_t ts_consume = test_clock();

while (received < 5) {
/* Wait more than dmax to count out of profile messages.
* Different for first message, that is skipped. */
int consume_timeout =
received == 0 ? 500 * test_timeout_multiplier : dmax * 2;
RdKafka::Message *msg = c->consume(consume_timeout);

rd_ts_t now = test_clock();
int latency = (test_clock() - ts_consume) / 1000;
ts_consume = now;
bool in_profile = latency <= dmax;

if (!msg)
Test::Fail(tostr() << "No message for " << consume_timeout << "ms");
if (msg->err())
Test::Fail("Unexpected consumer error: " + msg->errstr());

Test::Say(tostr() << "Message #" << received << " consumed in " << latency
<< "ms (expecting <= " << dmax << "ms)"
<< (received == 0 ? ": skipping first" : "")
<< (in_profile ? ": in profile" : ": OUT OF PROFILE")
<< "\n");

if (received++ > 0 && in_profile)
in_profile_cnt++;

delete msg;
}

Test::Say(tostr() << in_profile_cnt << "/" << received << " messages were "
<< "in profile (<= " << dmax
<< ") for backoff_ms=" << backoff_ms << "\n");

/* first message isn't counted*/
const int expected_in_profile = received - 1;
TEST_ASSERT(expected_in_profile - in_profile_cnt == 0,
"Only %d/%d messages were in profile", in_profile_cnt,
expected_in_profile);

delete c;

SUB_TEST_PASS();
}


extern "C" {
int main_0127_fetch_queue_backoff(int argc, char **argv) {
std::string topic = Test::mk_topic_name("0127_fetch_queue_backoff", 1);

/* Prime the topic with messages. */
RdKafka::Conf *conf;
Test::conf_init(&conf, NULL, 10);
Test::conf_set(conf, "batch.num.messages", "1");
std::string errstr;
RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr);
if (!p)
Test::Fail(tostr() << __FUNCTION__
<< ": Failed to create producer: " << errstr);
delete conf;

Test::produce_msgs(p, topic, 0, 100, 10000, true /*flush*/);
delete p;

do_test_queue_backoff(topic, -1);
do_test_queue_backoff(topic, 500);
do_test_queue_backoff(topic, 10);
do_test_queue_backoff(topic, 0);
return 0;
}
}
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ set(
0124-openssl_invalid_engine.c
0125-immediate_flush.c
0126-oauthbearer_oidc.c
0127-fetch_queue_backoff.cpp
0128-sasl_callback_queue.cpp
0129-fetch_aborted_msgs.c
0130-store_offsets.c
Expand Down
2 changes: 2 additions & 0 deletions tests/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ _TEST_DECL(0123_connections_max_idle);
_TEST_DECL(0124_openssl_invalid_engine);
_TEST_DECL(0125_immediate_flush);
_TEST_DECL(0126_oauthbearer_oidc);
_TEST_DECL(0127_fetch_queue_backoff);
_TEST_DECL(0128_sasl_callback_queue);
_TEST_DECL(0129_fetch_aborted_msgs);
_TEST_DECL(0130_store_offsets);
Expand Down Expand Up @@ -485,6 +486,7 @@ struct test tests[] = {
_TEST(0124_openssl_invalid_engine, TEST_F_LOCAL),
_TEST(0125_immediate_flush, 0),
_TEST(0126_oauthbearer_oidc, 0, TEST_BRKVER(3, 1, 0, 0)),
_TEST(0127_fetch_queue_backoff, 0),
_TEST(0128_sasl_callback_queue, TEST_F_LOCAL, TEST_BRKVER(2, 0, 0, 0)),
_TEST(0129_fetch_aborted_msgs, 0, TEST_BRKVER(0, 11, 0, 0)),
_TEST(0130_store_offsets, 0),
Expand Down
1 change: 1 addition & 0 deletions win32/tests/tests.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@
<ClCompile Include="..\..\tests\0124-openssl_invalid_engine.c" />
<ClCompile Include="..\..\tests\0125-immediate_flush.c" />
<ClCompile Include="..\..\tests\0126-oauthbearer_oidc.c" />
<ClCompile Include="..\..\tests\0127-fetch_queue_backoff.cpp" />
<ClCompile Include="..\..\tests\0128-sasl_callback_queue.cpp" />
<ClCompile Include="..\..\tests\0129-fetch_aborted_msgs.c" />
<ClCompile Include="..\..\tests\0130-store_offsets.c" />
Expand Down

0 comments on commit e52aa3b

Please sign in to comment.