Skip to content

Commit

Permalink
feat(metrics): track consumer records lag
Browse files Browse the repository at this point in the history
  • Loading branch information
Gantigmaa Selenge committed Aug 18, 2022
1 parent 3083a9b commit 5e9e0c7
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 0 deletions.
12 changes: 12 additions & 0 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ type FetchResponseBlock struct {
func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
metricRegistry := pd.metricRegistry()
var sizeMetric metrics.Histogram
var lagMetric metrics.Histogram
if metricRegistry != nil {
sizeMetric = getOrRegisterHistogram("consumer-fetch-response-size", metricRegistry)
lagMetric = getOrRegisterHistogram("consumer-records-lag", metricRegistry)
}

tmp, err := pd.getInt16()
Expand Down Expand Up @@ -180,6 +182,16 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
}
}

//Follows the Java implementation for calculating the records lag:
//https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
if len(b.RecordsSet) > 0 {
logEndOffset := *b.LastRecordsBatchOffset - 1
partitionLag := logEndOffset - b.RecordsSet[len(b.RecordsSet)-1].RecordBatch.LastOffset()
if lagMetric != nil {
lagMetric.Update(partitionLag)
}
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Consumer related metrics:
| consumer-fetch-rate-for-broker-<broker> | meter | Fetch requests/second sent to a given broker |
| consumer-fetch-rate-for-topic-<topic> | meter | Fetch requests/second sent for a given topic |
| consumer-fetch-response-size | histogram | Distribution of the fetch response size in bytes |
| consumer-records-lag | histogram | The latest lag of the partition |
| consumer-group-join-total-<GroupID> | counter | Total count of consumer group join attempts |
| consumer-group-join-failed-<GroupID> | counter | Total count of consumer group join failures |
| consumer-group-sync-total-<GroupID> | counter | Total count of consumer group sync attempts |
Expand Down

0 comments on commit 5e9e0c7

Please sign in to comment.