Skip to content

Commit

Permalink
kafka: Account for internal topic traffic in fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
graphcareful committed Mar 16, 2023
1 parent 491f363 commit 3339ab0
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/v/kafka/protocol/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ struct fetch_response final {

fetch_response_data data;

// Used for usage/metering to relay this value back to the connection layer
size_t internal_topic_bytes{0};

void encode(response_writer& writer, api_version version) {
data.encode(writer, version);
}
Expand Down
17 changes: 17 additions & 0 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,23 @@ ss::future<response_ptr> op_context::send_response() && {
final_response.data.error_code = response.data.error_code;
final_response.data.session_id = response.data.session_id;

/// Account for special internal topic bytes for usage
for (const auto& topic : response.data.topics) {
const bool bytes_to_exclude = std::find(
usage_excluded_topics.cbegin(),
usage_excluded_topics.cend(),
topic.name)
!= usage_excluded_topics.cend();
if (bytes_to_exclude) {
for (const auto& part : topic.partitions) {
if (part.records) {
final_response.internal_topic_bytes
+= part.records->size_bytes();
}
}
}
}

for (auto it = response.begin(true); it != response.end(); ++it) {
if (it->is_new_topic) {
final_response.data.topics.emplace_back(
Expand Down

0 comments on commit 3339ab0

Please sign in to comment.