Skip to content

Commit

Permalink
Fix to metadata cache expiration
Browse files Browse the repository at this point in the history
on full metadata refresh

Metadata cache was cleared on full metadata
refresh, leading to unnecessary refreshes and
occasional `UNKNOWN_TOPIC_OR_PART` errors.
Solved by updating cache for existing or
hinted entries instead of clearing them.
Happening since 2.1.0
  • Loading branch information
emasab committed Apr 11, 2024
1 parent 0e3f48d commit b27ccc6
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 58 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ librdkafka v2.3.1 is a maintenance release:
* [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4676)
* Fix to metadata cache expiration on full metadata refresh (#4677).


## Fixes
Expand All @@ -31,6 +32,10 @@ librdkafka v2.3.1 is a maintenance release:
before the expiration of a timeout, it was serving with a zero timeout,
leading to increased CPU usage until the timeout was reached.
Happening since 1.x (#4671).
* Metadata cache was cleared on full metadata refresh, leading to unnecessary
refreshes and occasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating
cache for existing or hinted entries instead of clearing them.
Happening since 2.1.0 (#4677).

### Consumer fixes

Expand Down
53 changes: 28 additions & 25 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
int broker_changes = 0;
int cache_changes = 0;
rd_ts_t ts_start = rd_clock();

/* If client rack is present, the metadata cache (topic or full) needs
* to contain the partition to rack map. */
rd_bool_t has_client_rack = rk->rk_conf.client_rack &&
Expand Down Expand Up @@ -850,23 +850,24 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_list_remove_cmp(missing_topic_ids,
&mdi->topics[i].topic_id,
(void *)rd_kafka_Uuid_ptr_cmp));
if (!all_topics) {
/* Only update cache when not asking
* for all topics. */

rd_kafka_wrlock(rk);
rd_kafka_metadata_cache_topic_update(
rk, &md->topics[i], &mdi->topics[i],
rd_false /*propagate later*/,
/* use has_client_rack rather than
compute_racks. We need cached rack ids
only in case we need to rejoin the group
if they change and client.rack is set
(KIP-881). */
has_client_rack, mdi->brokers, md->broker_cnt);
cache_changes++;
rd_kafka_wrunlock(rk);
}
/* Only update cache when not asking
* for all topics or cache entry
* already exists. */
rd_kafka_wrlock(rk);
cache_changes +=
rd_kafka_metadata_cache_topic_update(
rk, &md->topics[i], &mdi->topics[i],
rd_false /*propagate later*/,
/* use has_client_rack rather than
compute_racks. We need cached rack ids
only in case we need to rejoin the group
if they change and client.rack is set
(KIP-881). */
has_client_rack, mdi->brokers,
md->broker_cnt,
all_topics /*cache entry needs to exist
*if all_topics*/);
rd_kafka_wrunlock(rk);
}

/* Requested topics not seen in metadata? Propogate to topic code. */
Expand Down Expand Up @@ -979,9 +980,10 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
}

if (all_topics) {
/* Expire all cache entries that were not updated. */
rd_kafka_metadata_cache_evict_by_age(rkb->rkb_rk, ts_start);

/* All hints have been replaced by the corresponding entry.
* Rest of hints can be removed as topics aren't present
* in full metadata. */
rd_kafka_metadata_cache_purge_all_hints(rkb->rkb_rk);
if (rkb->rkb_rk->rk_full_metadata)
rd_kafka_metadata_destroy(
&rkb->rkb_rk->rk_full_metadata->metadata);
Expand All @@ -1001,10 +1003,6 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
"Caching full metadata with "
"%d broker(s) and %d topic(s): %s",
md->broker_cnt, md->topic_cnt, reason);
} else {
if (cache_changes)
rd_kafka_metadata_cache_propagate_changes(rk);
rd_kafka_metadata_cache_expiry_start(rk);
}
/* Remove cache hints for the originally requested topics. */
if (requested_topics)
Expand All @@ -1013,6 +1011,11 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_metadata_cache_purge_hints_by_id(rk,
requested_topic_ids);

if (cache_changes) {
rd_kafka_metadata_cache_propagate_changes(rk);
rd_kafka_metadata_cache_expiry_start(rk);
}

rd_kafka_wrunlock(rkb->rkb_rk);

if (broker_changes) {
Expand Down
7 changes: 4 additions & 3 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,15 +274,16 @@ int rd_kafka_metadata_cache_delete_by_name(rd_kafka_t *rk, const char *topic);
int rd_kafka_metadata_cache_delete_by_topic_id(rd_kafka_t *rk,
const rd_kafka_Uuid_t topic_id);
void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk);
int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts);
void rd_kafka_metadata_cache_topic_update(
int rd_kafka_metadata_cache_purge_all_hints(rd_kafka_t *rk);
int rd_kafka_metadata_cache_topic_update(
rd_kafka_t *rk,
const rd_kafka_metadata_topic_t *mdt,
const rd_kafka_metadata_topic_internal_t *mdit,
rd_bool_t propagate,
rd_bool_t include_metadata,
rd_kafka_metadata_broker_internal_t *brokers,
size_t broker_cnt);
size_t broker_cnt,
rd_bool_t only_existing);
void rd_kafka_metadata_cache_propagate_changes(rd_kafka_t *rk);
struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_find(rd_kafka_t *rk, const char *topic, int valid);
Expand Down
47 changes: 18 additions & 29 deletions src/rdkafka_metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,45 +182,27 @@ static int rd_kafka_metadata_cache_evict(rd_kafka_t *rk) {


/**
* @brief Evict timed out entries from cache based on their insert/update time
* rather than expiry time. Any entries older than \p ts will be evicted.
* @brief Remove all cache hints,.
* This is done when the Metadata response has been parsed and
* replaced hints with existing topic information, thus this will
* only remove unmatched topics from the cache.
*
* @returns the number of entries evicted.
* @returns the number of purged hints
*
* @locks_required rd_kafka_wrlock()
*/
int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts) {
int rd_kafka_metadata_cache_purge_all_hints(rd_kafka_t *rk) {
int cnt = 0;
struct rd_kafka_metadata_cache_entry *rkmce, *tmp;

TAILQ_FOREACH_SAFE(rkmce, &rk->rk_metadata_cache.rkmc_expiry,
rkmce_link, tmp) {
if (rkmce->rkmce_ts_insert <= ts) {
if (!RD_KAFKA_METADATA_CACHE_VALID(rkmce)) {
rd_kafka_metadata_cache_delete(rk, rkmce, 1);
cnt++;
}
}

/* Update expiry timer */
rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry);
if (rkmce)
rd_kafka_timer_start(&rk->rk_timers,
&rk->rk_metadata_cache.rkmc_expiry_tmr,
rkmce->rkmce_ts_expires - rd_clock(),
rd_kafka_metadata_cache_evict_tmr_cb, rk);
else
rd_kafka_timer_stop(&rk->rk_timers,
&rk->rk_metadata_cache.rkmc_expiry_tmr, 1);

rd_kafka_dbg(rk, METADATA, "METADATA",
"Expired %d entries older than %dms from metadata cache "
"(%d entries remain)",
cnt, (int)((rd_clock() - ts) / 1000),
rk->rk_metadata_cache.rkmc_cnt);

if (cnt)
rd_kafka_metadata_cache_propagate_changes(rk);

return cnt;
}

Expand Down Expand Up @@ -481,23 +463,28 @@ void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk) {
*
* @locks rd_kafka_wrlock()
*/
void rd_kafka_metadata_cache_topic_update(
int rd_kafka_metadata_cache_topic_update(
rd_kafka_t *rk,
const rd_kafka_metadata_topic_t *mdt,
const rd_kafka_metadata_topic_internal_t *mdit,
rd_bool_t propagate,
rd_bool_t include_racks,
rd_kafka_metadata_broker_internal_t *brokers,
size_t broker_cnt) {
size_t broker_cnt,
rd_bool_t only_existing) {
struct rd_kafka_metadata_cache_entry *rkmce = NULL;
rd_ts_t now = rd_clock();
rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000);
int changed = 1;
if (unlikely(!mdt->topic)) {
if (likely(mdt->topic && only_existing)) {
rkmce = rd_kafka_metadata_cache_find(rk, mdt->topic, 0);
if (!rkmce)
return 0;
} else if (unlikely(!mdt->topic)) {
rkmce =
rd_kafka_metadata_cache_find_by_id(rk, mdit->topic_id, 1);
if (!rkmce)
return;
return 0;
}

if (!mdt->topic) {
Expand All @@ -524,6 +511,8 @@ void rd_kafka_metadata_cache_topic_update(

if (changed && propagate)
rd_kafka_metadata_cache_propagate_changes(rk);

return changed;
}


Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -2046,7 +2046,7 @@ void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt,

rd_kafka_wrlock(rkt->rkt_rk);
rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt, &mdit, rd_true,
rd_false, NULL, 0);
rd_false, NULL, 0, rd_false);
rd_kafka_topic_metadata_update(rkt, &mdt, &mdit, rd_clock());
rd_kafka_wrunlock(rkt->rkt_rk);
rd_free(partitions);
Expand Down

0 comments on commit b27ccc6

Please sign in to comment.