From d6374bc2ce3041005842edd353a3bb010f467abe Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 28 May 2024 20:09:08 +0200 Subject: [PATCH] feat(blooms): Add counter metric for blocks that are not available at query time (#12968) When filtering chunks on the bloom gateway, bloom block may not be available and they will be downloaded asynchronously in the background. This new metric `loki_bloom_gateway_blocks_not_available_total` counts the blocks that are not available at query time. Signed-off-by: Christian Haudum --- pkg/bloomgateway/bloomgateway.go | 19 ++++++++++--------- pkg/bloomgateway/metrics.go | 19 +++++++++++++------ pkg/bloomgateway/processor.go | 2 +- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index ee0e6f9940fd..165e2d652473 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -290,6 +290,13 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk g.activeUsers.UpdateUserTimestamp(tenantID, time.Now()) + var preFilterSeries, preFilterChunks int + + preFilterSeries = len(req.Refs) + for _, series := range req.Refs { + preFilterChunks += len(series.Refs) + } + // Ideally we could use an unbuffered channel here, but since we return the // request on the first error, there can be cases where the request context // is not done yet and the consumeTask() function wants to send to the @@ -316,13 +323,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk remaining := len(tasks) - preFilterSeries := len(req.Refs) - var preFilterChunks, postFilterChunks int - - for _, series := range req.Refs { - preFilterChunks += len(series.Refs) - } - combinedRecorder := v1.NewBloomRecorder(ctx, "combined") for remaining > 0 { select { @@ -353,11 +353,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk responsesPool.Put(resp) } - postFilterSeries := len(filtered) - + var postFilterSeries, postFilterChunks int + postFilterSeries = len(filtered) for _, group := range filtered { postFilterChunks += len(group.Refs) } + g.metrics.requestedSeries.Observe(float64(preFilterSeries)) g.metrics.filteredSeries.Observe(float64(preFilterSeries - postFilterSeries)) g.metrics.requestedChunks.Observe(float64(preFilterChunks)) diff --git a/pkg/bloomgateway/metrics.go b/pkg/bloomgateway/metrics.go index 0885bc2ae7cb..5c046d3147c3 100644 --- a/pkg/bloomgateway/metrics.go +++ b/pkg/bloomgateway/metrics.go @@ -116,12 +116,13 @@ func newServerMetrics(registerer prometheus.Registerer, namespace, subsystem str } type workerMetrics struct { - dequeueDuration *prometheus.HistogramVec - queueDuration *prometheus.HistogramVec - processDuration *prometheus.HistogramVec - tasksDequeued *prometheus.CounterVec - tasksProcessed *prometheus.CounterVec - blockQueryLatency *prometheus.HistogramVec + dequeueDuration *prometheus.HistogramVec + queueDuration *prometheus.HistogramVec + processDuration *prometheus.HistogramVec + tasksDequeued *prometheus.CounterVec + tasksProcessed *prometheus.CounterVec + blocksNotAvailable *prometheus.CounterVec + blockQueryLatency *prometheus.HistogramVec } func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem string) *workerMetrics { @@ -158,6 +159,12 @@ func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem str Name: "tasks_processed_total", Help: "Total amount of tasks that the worker processed", }, append(labels, "status")), + blocksNotAvailable: r.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "blocks_not_available_total", + Help: "Total amount of blocks that have been skipped because they were not found or not downloaded yet", + }, labels), blockQueryLatency: r.NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index 1e8452ded5d6..6973ad1f565b 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -126,7 +126,7 @@ func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.Close return concurrency.ForEachJob(ctx, len(bqs), p.concurrency, func(ctx context.Context, i int) error { bq := bqs[i] if bq == nil { - // TODO(chaudum): Add metric for skipped blocks + p.metrics.blocksNotAvailable.WithLabelValues(p.id).Inc() return nil }