Skip to content

Commit

Permalink
Merge 'master' of librdkafka into dev_kip430_cp
Browse files Browse the repository at this point in the history
  • Loading branch information
jainruchir committed Jun 14, 2023
2 parents 0e16f98 + 966b63d commit 060d694
Show file tree
Hide file tree
Showing 62 changed files with 6,797 additions and 1,015 deletions.
2 changes: 1 addition & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ blocks:
- make -j -C tests build
- make -C tests run_local_quick
- DESTDIR="$PWD/dest" make install
- (cd tests && python3 -m trivup.clusters.KafkaCluster --version 3.1.0 --cmd 'make quick')
- (cd tests && python3 -m trivup.clusters.KafkaCluster --version 3.4.0 --cmd 'make quick')


- name: 'Linux x64: release artifact docker builds'
Expand Down
90 changes: 88 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,82 @@
# librdkafka v2.2.0

librdkafka v2.2.0 is a feature release:

* Fix a segmentation fault when subscribing to non-existent topics and
using the consume batch functions (#4273).
* Store offset commit metadata in `rd_kafka_offsets_store` (@mathispesch, #4084).
* Fix a bug that happens when skipping tags, causing buffer underflow in
MetadataResponse (#4278).
* [KIP-881](https://cwiki.apache.org/confluence/display/KAFKA/KIP-881%3A+Rack-aware+Partition+Assignment+for+Kafka+Consumers):
Add support for rack-aware partition assignment for consumers
(#4184, #4291, #4252).
* Fix several bugs with sticky assignor in case of partition ownership
changing between members of the consumer group (#4252).


## Fixes

### General fixes

* Fix a bug that happens when skipping tags, causing buffer underflow in
MetadataResponse. This is triggered since RPC version 9 (v2.1.0),
when using Confluent Platform, only when racks are set,
observers are activated and there is more than one partition.
Fixed by skipping the correct amount of bytes when tags are received.


### Consumer fixes

* In case of multiple owners of a partition with different generations, the
sticky assignor would pick the earliest (lowest generation) member as the
current owner, which would lead to stickiness violations. Fixed by
choosing the latest (highest generation) member.
* In case where the same partition is owned by two members with the same
generation, it indicates an issue. The sticky assignor had some code to
handle this, but it was non-functional, and did not have parity with the
Java assignor. Fixed by invalidating any such partition from the current
assignment completely.


# librdkafka v2.1.1

librdkafka v2.1.1 is a maintenance release:

* Avoid duplicate messages when a fetch response is received
in the middle of an offset validation request (#4261).
* Fix segmentation fault when subscribing to a non-existent topic and
calling `rd_kafka_message_leader_epoch()` on the polled `rkmessage` (#4245).
* Fix a segmentation fault when fetching from follower and the partition lease
expires while waiting for the result of a list offsets operation (#4254).
* Fix documentation for the admin request timeout, incorrectly stating -1 for infinite
timeout. That timeout can't be infinite.
* Fix CMake pkg-config cURL require and use
pkg-config `Requires.private` field (@FantasqueX, @stertingen, #4180).
* Fixes certain cases where polling would not keep the consumer
in the group or make it rejoin it (#4256).
* Fix to the C++ set_leader_epoch method of TopicPartitionImpl,
that wasn't storing the passed value (@pavel-pimenov, #4267).

## Fixes

### Consumer fixes

* Duplicate messages can be emitted when a fetch response is received
in the middle of an offset validation request. Solved by avoiding
a restart from last application offset when offset validation succeeds.
* When fetching from follower, if the partition lease expires after 5 minutes,
and a list offsets operation was requested to retrieve the earliest
or latest offset, it resulted in segmentation fault. This was fixed by
allowing threads different from the main one to call
the `rd_kafka_toppar_set_fetch_state` function, given they hold
the lock on the `rktp`.
* In v2.1.0, a bug was fixed which caused polling any queue to reset the
`max.poll.interval.ms`. Only certain functions were made to reset the timer,
but it is possible for the user to obtain the queue with messages from
the broker, skipping these functions. This was fixed by encoding information
in a queue itself, that, whether polling, resets the timer.


# librdkafka v2.1.0

librdkafka v2.1.0 is a feature release:
Expand Down Expand Up @@ -64,19 +143,26 @@ librdkafka v2.1.0 is a feature release:
any of the **seek**, **pause**, **resume** or **rebalancing** operation, `on_consume`
interceptors might be called incorrectly (maybe multiple times) for not consumed messages.

### Consume API

* Duplicate messages can be emitted when a fetch response is received
in the middle of an offset validation request.
* Segmentation fault when subscribing to a non-existent topic and
calling `rd_kafka_message_leader_epoch()` on the polled `rkmessage`.



# librdkafka v2.0.2

librdkafka v2.0.2 is a bugfix release:
librdkafka v2.0.2 is a maintenance release:

* Fix OpenSSL version in Win32 nuget package (#4152).



# librdkafka v2.0.1

librdkafka v2.0.1 is a bugfix release:
librdkafka v2.0.1 is a maintenance release:

* Fixed nuget package for Linux ARM64 release (#4150).

Expand Down
1 change: 1 addition & 0 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1957,6 +1957,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-654 - Aborted txns with non-flushed msgs should not be fatal | 2.7.0 | Supported |
| KIP-735 - Increase default consumer session timeout | 3.0.0 | Supported |
| KIP-768 - SASL/OAUTHBEARER OIDC support | 3.0 | Supported |
| KIP-881 - Rack-aware Partition Assignment for Kafka Consumers | 3.5.0 (WIP) | Supported |



Expand Down
2 changes: 1 addition & 1 deletion packaging/cmake/rdkafka.pc.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ libdir=${prefix}/lib
Name: @PKG_CONFIG_NAME@
Description: @PKG_CONFIG_DESCRIPTION@
Version: @PKG_CONFIG_VERSION@
Requires: @PKG_CONFIG_REQUIRES@
Requires.private: @PKG_CONFIG_REQUIRES_PRIVATE@
Cflags: @PKG_CONFIG_CFLAGS@
Libs: @PKG_CONFIG_LIBS@
Libs.private: @PKG_CONFIG_LIBS_PRIVATE@
4 changes: 2 additions & 2 deletions src-cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ set(PKG_CONFIG_VERSION "${PROJECT_VERSION}")
if(NOT RDKAFKA_BUILD_STATIC)
set(PKG_CONFIG_NAME "librdkafka++")
set(PKG_CONFIG_DESCRIPTION "The Apache Kafka C/C++ library")
set(PKG_CONFIG_REQUIRES "rdkafka")
set(PKG_CONFIG_REQUIRES_PRIVATE "rdkafka")
set(PKG_CONFIG_CFLAGS "-I\${includedir}")
set(PKG_CONFIG_LIBS "-L\${libdir} -lrdkafka++")
set(PKG_CONFIG_LIBS_PRIVATE "-lrdkafka")
Expand All @@ -57,7 +57,7 @@ if(NOT RDKAFKA_BUILD_STATIC)
else()
set(PKG_CONFIG_NAME "librdkafka++-static")
set(PKG_CONFIG_DESCRIPTION "The Apache Kafka C/C++ library (static)")
set(PKG_CONFIG_REQUIRES "")
set(PKG_CONFIG_REQUIRES_PRIVATE "")
set(PKG_CONFIG_CFLAGS "-I\${includedir} -DLIBRDKAFKA_STATICLIB")
set(PKG_CONFIG_LIBS "-L\${libdir} \${libdir}/librdkafka++.a")
if(WIN32)
Expand Down
10 changes: 10 additions & 0 deletions src-cpp/HandleImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ rd_kafka_topic_partition_list_t *partitions_to_c_parts(
rd_kafka_topic_partition_t *rktpar = rd_kafka_topic_partition_list_add(
c_parts, tpi->topic_.c_str(), tpi->partition_);
rktpar->offset = tpi->offset_;
if (tpi->metadata_.size()) {
void *metadata_p = mem_malloc(tpi->metadata_.size());
memcpy(metadata_p, tpi->metadata_.data(), tpi->metadata_.size());
rktpar->metadata = metadata_p;
rktpar->metadata_size = tpi->metadata_.size();
}
if (tpi->leader_epoch_ != -1)
rd_kafka_topic_partition_set_leader_epoch(rktpar, tpi->leader_epoch_);
}
Expand All @@ -417,6 +423,10 @@ void update_partitions_from_c_parts(
pp->offset_ = p->offset;
pp->err_ = static_cast<RdKafka::ErrorCode>(p->err);
pp->leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(p);
if (p->metadata_size) {
unsigned char *metadata = (unsigned char *)p->metadata;
pp->metadata_.assign(metadata, metadata + p->metadata_size);
}
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion src-cpp/rdkafkacpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ namespace RdKafka {
* @remark This value should only be used during compile time,
* for runtime checks of version use RdKafka::version()
*/
#define RD_KAFKA_VERSION 0x020100ff
#define RD_KAFKA_VERSION 0x020101ff

/**
* @brief Returns the librdkafka version as integer.
Expand Down Expand Up @@ -1986,6 +1986,12 @@ class RD_EXPORT TopicPartition {

/** @brief Set partition leader epoch. */
virtual void set_leader_epoch(int32_t leader_epoch) = 0;

/** @brief Get partition metadata. */
virtual std::vector<unsigned char> get_metadata() = 0;

/** @brief Set partition metadata. */
virtual void set_metadata(std::vector<unsigned char> &metadata) = 0;
};


Expand Down
16 changes: 14 additions & 2 deletions src-cpp/rdkafkacpp_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,10 @@ class TopicPartitionImpl : public TopicPartition {
offset_ = c_part->offset;
err_ = static_cast<ErrorCode>(c_part->err);
leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(c_part);
// FIXME: metadata
if (c_part->metadata_size > 0) {
unsigned char *metadata = (unsigned char *)c_part->metadata;
metadata_.assign(metadata, metadata + c_part->metadata_size);
}
}

static void destroy(std::vector<TopicPartition *> &partitions);
Expand Down Expand Up @@ -1289,7 +1292,15 @@ class TopicPartitionImpl : public TopicPartition {
}

void set_leader_epoch(int32_t leader_epoch) {
leader_epoch_ = leader_epoch_;
leader_epoch_ = leader_epoch;
}

std::vector<unsigned char> get_metadata() {
return metadata_;
}

void set_metadata(std::vector<unsigned char> &metadata) {
metadata_ = metadata;
}

std::ostream &operator<<(std::ostream &ostrm) const {
Expand All @@ -1301,6 +1312,7 @@ class TopicPartitionImpl : public TopicPartition {
int64_t offset_;
ErrorCode err_;
int32_t leader_epoch_;
std::vector<unsigned char> metadata_;
};


Expand Down
16 changes: 8 additions & 8 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ target_include_directories(rdkafka PUBLIC "$<BUILD_INTERFACE:${dummy}>")

if(WITH_CURL)
find_package(CURL REQUIRED)
target_include_directories(rdkafka PUBLIC ${CURL_INCLUDE_DIRS})
target_include_directories(rdkafka PRIVATE ${CURL_INCLUDE_DIRS})
target_link_libraries(rdkafka PUBLIC ${CURL_LIBRARIES})
endif()

Expand Down Expand Up @@ -272,7 +272,7 @@ endif()

# Generate pkg-config file
set(PKG_CONFIG_VERSION "${PROJECT_VERSION}")
set(PKG_CONFIG_REQUIRES "")
set(PKG_CONFIG_REQUIRES_PRIVATE "")
if (WIN32)
set(PKG_CONFIG_LIBS_PRIVATE "-lws2_32 -lsecur32 -lcrypt32")
else()
Expand All @@ -296,27 +296,27 @@ if(NOT RDKAFKA_BUILD_STATIC)
set(PKG_CONFIG_DESCRIPTION "The Apache Kafka C/C++ library")

if(WITH_CURL)
string(APPEND PKG_CONFIG_REQUIRES "curl ")
string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "libcurl ")
endif()

if(WITH_ZLIB)
string(APPEND PKG_CONFIG_REQUIRES "zlib ")
string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "zlib ")
endif()

if(WITH_SSL)
string(APPEND PKG_CONFIG_REQUIRES "libssl ")
string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "libcrypto libssl ")
endif()

if(WITH_SASL_CYRUS)
string(APPEND PKG_CONFIG_REQUIRES "libsasl2 ")
string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "libsasl2 ")
endif()

if(WITH_ZSTD)
string(APPEND PKG_CONFIG_REQUIRES "libzstd ")
string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "libzstd ")
endif()

if(WITH_LZ4_EXT)
string(APPEND PKG_CONFIG_REQUIRES "liblz4 ")
string(APPEND PKG_CONFIG_REQUIRES_PRIVATE "liblz4 ")
endif()

set(PKG_CONFIG_CFLAGS "-I\${includedir}")
Expand Down
27 changes: 26 additions & 1 deletion src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ void rd_kafka_destroy_final(rd_kafka_t *rk) {
mtx_destroy(&rk->rk_init_lock);

if (rk->rk_full_metadata)
rd_kafka_metadata_destroy(rk->rk_full_metadata);
rd_kafka_metadata_destroy(&rk->rk_full_metadata->metadata);
rd_kafkap_str_destroy(rk->rk_client_id);
rd_kafkap_str_destroy(rk->rk_group_id);
rd_kafkap_str_destroy(rk->rk_eos.transactional_id);
Expand Down Expand Up @@ -4003,20 +4003,37 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,

int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms) {
int r;
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rk->rk_rep, RD_DO_LOCK);

if (timeout_ms && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rk);

r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK,
rd_kafka_poll_cb, NULL);

if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rk);

return r;
}


rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) {
rd_kafka_op_t *rko;
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rkqu->rkqu_q, RD_DO_LOCK);


if (timeout_ms && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rkqu->rkqu_rk);

rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0,
RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL);

if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rkqu->rkqu_rk);

if (!rko)
return NULL;

Expand All @@ -4025,10 +4042,18 @@ rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) {

int rd_kafka_queue_poll_callback(rd_kafka_queue_t *rkqu, int timeout_ms) {
int r;
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rkqu->rkqu_q, RD_DO_LOCK);

if (timeout_ms && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rkqu->rkqu_rk);

r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0,
RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);

if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rkqu->rkqu_rk);

return r;
}

Expand Down
11 changes: 8 additions & 3 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ typedef SSIZE_T ssize_t;
* @remark This value should only be used during compile time,
* for runtime checks of version use rd_kafka_version()
*/
#define RD_KAFKA_VERSION 0x020100ff
#define RD_KAFKA_VERSION 0x020101ff

/**
* @brief Returns the librdkafka version as integer.
Expand Down Expand Up @@ -3431,6 +3431,12 @@ rd_kafka_error_t *rd_kafka_sasl_set_credentials(rd_kafka_t *rk,
*
* @remark rd_kafka_queue_destroy() MUST be called on this queue
* prior to calling rd_kafka_consumer_close().
* @remark Polling the returned queue counts as a consumer poll, and will reset
* the timer for max.poll.interval.ms. If this queue is forwarded to a
* "destq", polling destq also counts as a consumer poll (this works
* for any number of forwards). However, even if this queue is
* unforwarded or forwarded elsewhere, polling destq will continue
* to count as a consumer poll.
*/
RD_EXPORT
rd_kafka_queue_t *rd_kafka_queue_get_consumer(rd_kafka_t *rk);
Expand Down Expand Up @@ -6808,8 +6814,7 @@ RD_EXPORT void rd_kafka_AdminOptions_destroy(rd_kafka_AdminOptions_t *options);
* request transmission, operation time on broker, and response.
*
* @param options Admin options.
* @param timeout_ms Timeout in milliseconds, use -1 for indefinite timeout.
* Defaults to `socket.timeout.ms`.
* @param timeout_ms Timeout in milliseconds. Defaults to `socket.timeout.ms`.
* @param errstr A human readable error string (nul-terminated) is written to
* this location that must be of at least \p errstr_size bytes.
* The \p errstr is only written in case of error.
Expand Down
Loading

0 comments on commit 060d694

Please sign in to comment.