Skip to content

Commit

Permalink
Fix for a wrong error returned
Browse files Browse the repository at this point in the history
on full metadata refresh before
joining a consumer group

A metadata call before member joins
consumer group, could lead to
an `UNKNOWN_TOPIC_OR_PART` error.
Solved by updating the consumer group
following a metadata refresh only in safe states.
Happening since 2.1.0
  • Loading branch information
emasab committed Apr 8, 2024
1 parent ef16a8a commit 3e05712
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 9 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ librdkafka v2.3.1 is a maintenance release:
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4676)
* Fix to metadata cache expiration on full metadata refresh (#4677).
* Fix for a wrong error returned on full metadata refresh before joining
a consumer group (#4678).


## Fixes
Expand All @@ -34,6 +36,10 @@ librdkafka v2.3.1 is a maintenance release:
refreshes and occasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating
cache for existing or hinted entries instead of clearing them.
Happening since 2.1.0 (#4677).
* A metadata call before member joins consumer group,
could lead to an `UNKNOWN_TOPIC_OR_PART` error. Solved by updating
the consumer group following a metadata refresh only in safe states.
Happening since 2.1.0 (#4678).



Expand Down
26 changes: 17 additions & 9 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -2333,10 +2333,6 @@ static int rd_kafka_cgrp_metadata_refresh(rd_kafka_cgrp_t *rkcg,

rd_list_init(&topics, 8, rd_free);

/* Insert all non-wildcard topics in cache. */
rd_kafka_metadata_cache_hint_rktparlist(
rkcg->rkcg_rk, rkcg->rkcg_subscription, NULL, 0 /*dont replace*/);

if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) {
/* For wildcard subscriptions make sure the
* cached full metadata isn't too old. */
Expand Down Expand Up @@ -5005,6 +5001,19 @@ rd_kafka_cgrp_calculate_subscribe_revoking_partitions(
return revoking;
}

static void
rd_kafka_cgrp_subscription_set(rd_kafka_cgrp_t *rkcg,
rd_kafka_topic_partition_list_t *rktparlist) {
rkcg->rkcg_subscription = rktparlist;
if (rkcg->rkcg_subscription) {
/* Insert all non-wildcard topics in cache immediately.
* to avoid problems with subsequent metadata
* requests. */
rd_kafka_metadata_cache_hint_rktparlist(
rkcg->rkcg_rk, rkcg->rkcg_subscription, NULL,
0 /*dont replace*/);
}
}

/**
* @brief Handle a new subscription that is modifying an existing subscription
Expand Down Expand Up @@ -5037,7 +5046,7 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg,
rkcg, unsubscribing_topics);

rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
rkcg->rkcg_subscription = rktparlist;
rd_kafka_cgrp_subscription_set(rkcg, rktparlist);

if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age,
"modify subscription") == 1) {
Expand Down Expand Up @@ -5146,7 +5155,7 @@ static rd_kafka_resp_err_t rd_kafka_cgrp_unsubscribe(rd_kafka_cgrp_t *rkcg,

if (rkcg->rkcg_subscription) {
rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
rkcg->rkcg_subscription = NULL;
rd_kafka_cgrp_subscription_set(rkcg, NULL);
}

if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_GENERIC)
Expand Down Expand Up @@ -5244,7 +5253,7 @@ rd_kafka_cgrp_subscribe(rd_kafka_cgrp_t *rkcg,
if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0)
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;

rkcg->rkcg_subscription = rktparlist;
rd_kafka_cgrp_subscription_set(rkcg, rktparlist);

rd_kafka_cgrp_join(rkcg);

Expand Down Expand Up @@ -5909,8 +5918,7 @@ rd_kafka_cgrp_consumer_subscribe(rd_kafka_cgrp_t *rkcg,
RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE |
RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION;

rkcg->rkcg_subscription = rktparlist;

rd_kafka_cgrp_subscription_set(rkcg, rktparlist);
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
} else {
rd_kafka_cgrp_unsubscribe(rkcg, rd_true /*leave group*/);
Expand Down

0 comments on commit 3e05712

Please sign in to comment.