diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fcab528a3..c864f9e7c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,23 @@ # librdkafka v2.3.1 -librdkafka v2.3.1 is a feature release: +librdkafka v2.3.1 is a maintenance release: * Upgrade OpenSSL to v3.0.12 (while building from source) with various security fixes, check the [release notes](https://www.openssl.org/news/cl30.txt). + * Fixed a bug causing duplicate message consumption from a stale + fetch start offset in some particular cases (#) + + +## Fixes + +### Consumer fixes + + * In case of subscription change with a consumer using the cooperative assignor + it could resume fetching from a previous position. + That could also happen if resuming a partition that wasn't paused. + Fixed by ensuring that a resume operation is completely a no-op when + the partition isn't paused (#). + # librdkafka v2.3.0 diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 357c137db8..5b3fec0043 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -2299,7 +2299,22 @@ rd_kafka_resp_err_t rd_kafka_toppar_op_pause_resume(rd_kafka_toppar_t *rktp, int flag, rd_kafka_replyq_t replyq) { int32_t version; - rd_kafka_op_t *rko; + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE); + + if (!pause) { + /* If partitions isn't paused, avoid bumping its version, + * as it'll result in resuming fetches from a stale + * next_fetch_start */ + rd_bool_t paused = rd_false; + rd_kafka_toppar_lock(rktp); + paused = RD_KAFKA_TOPPAR_IS_PAUSED(rktp); + rd_kafka_toppar_unlock(rktp); + if (!paused) { + rko->rko_replyq = replyq; + rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR); + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + } /* Bump version barrier. */ version = rd_kafka_toppar_version_new_barrier(rktp); @@ -2310,7 +2325,6 @@ rd_kafka_resp_err_t rd_kafka_toppar_op_pause_resume(rd_kafka_toppar_t *rktp, RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), rktp->rktp_partition, version); - rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE); rko->rko_version = version; rko->rko_u.pause.pause = pause; rko->rko_u.pause.flag = flag;