Skip to content

Commit

Permalink
feat: Add metrics for Ingester RF-1 (#13510)
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana committed Jul 16, 2024
1 parent c4405fe commit d4179aa
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 85 deletions.
28 changes: 17 additions & 11 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
func (i *Ingester) InitFlushQueues() {
i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueueLength)
i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueues)
go i.flushLoop(j)
}
}
Expand Down Expand Up @@ -107,14 +107,15 @@ func (i *Ingester) flushLoop(j int) {
start := time.Now()

// We'll use this to log the size of the segment that was flushed.
n := humanize.Bytes(uint64(op.it.Writer.InputSize()))
n := op.it.Writer.InputSize()
humanized := humanize.Bytes(uint64(n))

err := i.flushOp(l, op)
d := time.Since(start)
if err != nil {
level.Error(l).Log("msg", "failed to flush", "size", n, "duration", d, "err", err)
level.Error(l).Log("msg", "failed to flush", "size", humanized, "duration", d, "err", err)
} else {
level.Debug(l).Log("msg", "flushed", "size", n, "duration", d)
level.Debug(l).Log("msg", "flushed", "size", humanized, "duration", d)
}

op.it.Result.SetDone(err)
Expand Down Expand Up @@ -144,16 +145,21 @@ func (i *Ingester) flushOp(l log.Logger, op *flushOp) error {
// If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed
// segments to have another opportunity to be flushed.
func (i *Ingester) flushSegment(ctx context.Context, ch *wal.SegmentWriter) error {
reader := ch.Reader()
defer runutil.CloseWithLogOnErr(util_log.Logger, reader, "flushSegment")
id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader)
r := ch.Reader()

newUlid := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader)
start := time.Now()
defer func() {
runutil.CloseWithLogOnErr(util_log.Logger, r, "flushSegment")
i.metrics.flushDuration.Observe(time.Since(start).Seconds())
ch.Observe()
}()

if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+newUlid.String()), reader); err != nil {
i.metrics.chunksFlushFailures.Inc()
i.metrics.flushesTotal.Add(1)
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id.String()), r); err != nil {
i.metrics.flushFailuresTotal.Inc()
return fmt.Errorf("store put chunk: %w", err)
}
i.metrics.flushedChunksStats.Inc(1)
// TODO: report some flush metrics

return nil
}
2 changes: 1 addition & 1 deletion pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func New(cfg Config, clientConfig client.Config,
}
compressionStats.Set(cfg.ChunkEncoding)
targetSizeStats.Set(int64(cfg.TargetChunkSize))
metrics := newIngesterMetrics(registerer, metricsNamespace)
metrics := newIngesterMetrics(registerer)

walManager, err := wal.NewManager(wal.Config{
MaxAge: cfg.MaxSegmentAge,
Expand Down
86 changes: 49 additions & 37 deletions pkg/ingester-rf1/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,55 +3,67 @@ package ingesterrf1
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/util/constants"
)

type flushMetrics struct {
flushesTotal prometheus.Counter
flushFailuresTotal prometheus.Counter
flushQueues prometheus.Gauge
flushDuration prometheus.Histogram
flushSizeBytes prometheus.Histogram
}

func newFlushMetrics(r prometheus.Registerer) *flushMetrics {
return &flushMetrics{
flushesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_flushes_total",
Help: "The total number of flushes.",
}),
flushFailuresTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_flush_failures_total",
Help: "The total number of failed flushes.",
}),
flushQueues: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_flush_queues",
Help: "The total number of flush queues.",
}),
flushDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_flush_duration_seconds",
Help: "The flush duration (in seconds).",
Buckets: prometheus.ExponentialBuckets(0.001, 4, 8),
NativeHistogramBucketFactor: 1.1,
}),
flushSizeBytes: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_flush_size_bytes",
Help: "The flush size (as written to object storage).",
Buckets: prometheus.ExponentialBuckets(100, 10, 8),
NativeHistogramBucketFactor: 1.1,
}),
}
}

type ingesterMetrics struct {
limiterEnabled prometheus.Gauge
autoForgetUnhealthyIngestersTotal prometheus.Counter
chunksFlushFailures prometheus.Counter
chunksFlushedPerReason *prometheus.CounterVec
flushedChunksStats *analytics.Counter
flushQueueLength prometheus.Gauge

// Shutdown marker for ingester scale down
limiterEnabled prometheus.Gauge
// Shutdown marker for ingester scale down.
shutdownMarker prometheus.Gauge
*flushMetrics
}

func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterMetrics {
func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
return &ingesterMetrics{
limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_limiter_enabled",
Help: "Whether the ingester's limiter is enabled",
}),
autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_autoforget_unhealthy_ingesters_total",
Help: "Total number of ingesters automatically forgotten",
}),
chunksFlushFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "ingester_rf1_chunks_flush_failures_total",
Help: "Total number of flush failures.",
}),
chunksFlushedPerReason: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "ingester_rf1_chunks_flushed_total",
Help: "Total flushed chunks per reason.",
}, []string{"reason"}),
flushedChunksStats: analytics.NewCounter("ingester_rf1_flushed_chunks"),
flushQueueLength: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: "ingester_rf1",
Name: "flush_queue_length",
Help: "The total number of series pending in the flush queue.",
Help: "Total number of ingesters automatically forgotten.",
}),
limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_limiter_enabled",
Help: "1 if the limiter is enabled, otherwise 0.",
}),
shutdownMarker: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: constants.Loki,
Subsystem: "ingester_rf1",
Name: "shutdown_marker",
Help: "1 if prepare shutdown has been called, 0 otherwise",
Name: "loki_ingester_rf1_shutdown_marker",
Help: "1 if prepare shutdown has been called, 0 otherwise.",
}),
flushMetrics: newFlushMetrics(r),
}
}
31 changes: 3 additions & 28 deletions pkg/storage/wal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -83,29 +81,6 @@ type Config struct {
MaxSegmentSize int64
}

type Metrics struct {
NumAvailable prometheus.Gauge
NumPending prometheus.Gauge
NumFlushing prometheus.Gauge
}

func NewMetrics(r prometheus.Registerer) *Metrics {
return &Metrics{
NumAvailable: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "wal_segments_available",
Help: "The number of WAL segments accepting writes.",
}),
NumPending: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "wal_segments_pending",
Help: "The number of WAL segments waiting to be flushed.",
}),
NumFlushing: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "wal_segments_flushing",
Help: "The number of WAL segments being flushed.",
}),
}
}

// Manager buffers segments in memory, and keeps track of which segments are
// available and which are waiting to be flushed. The maximum number of
// segments that can be buffered in memory, and their maximum age and maximum
Expand All @@ -123,7 +98,7 @@ func NewMetrics(r prometheus.Registerer) *Metrics {
// and avoid congestion collapse due to excessive timeouts and retries.
type Manager struct {
cfg Config
metrics *Metrics
metrics *ManagerMetrics

// available is a list of segments that are available and accepting data.
// All segments other than the segment at the front of the list are empty,
Expand Down Expand Up @@ -163,15 +138,15 @@ type PendingItem struct {
func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
m := Manager{
cfg: cfg,
metrics: metrics,
metrics: metrics.ManagerMetrics,
available: list.New(),
pending: list.New(),
shutdown: make(chan struct{}),
}
m.metrics.NumPending.Set(0)
m.metrics.NumFlushing.Set(0)
for i := int64(0); i < cfg.MaxSegments; i++ {
w, err := NewWalSegmentWriter()
w, err := NewWalSegmentWriter(metrics.SegmentMetrics)
if err != nil {
return nil, err
}
Expand Down
77 changes: 77 additions & 0 deletions pkg/storage/wal/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package wal

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type ManagerMetrics struct {
NumAvailable prometheus.Gauge
NumPending prometheus.Gauge
NumFlushing prometheus.Gauge
}

func NewManagerMetrics(r prometheus.Registerer) *ManagerMetrics {
return &ManagerMetrics{
NumAvailable: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "wal_segments_available",
Help: "The number of WAL segments accepting writes.",
}),
NumPending: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "wal_segments_pending",
Help: "The number of WAL segments waiting to be flushed.",
}),
NumFlushing: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "wal_segments_flushing",
Help: "The number of WAL segments being flushed.",
}),
}
}

type SegmentMetrics struct {
outputSizeBytes prometheus.Histogram
inputSizeBytes prometheus.Histogram
streams prometheus.Histogram
tenants prometheus.Histogram
}

func NewSegmentMetrics(r prometheus.Registerer) *SegmentMetrics {
return &SegmentMetrics{
outputSizeBytes: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_segment_output_size_bytes",
Help: "The segment size as written to disk (compressed).",
Buckets: prometheus.ExponentialBuckets(100, 10, 8),
NativeHistogramBucketFactor: 1.1,
}),
inputSizeBytes: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_segment_input_size_bytes",
Help: "The segment size (uncompressed).",
Buckets: prometheus.ExponentialBuckets(100, 10, 8),
NativeHistogramBucketFactor: 1.1,
}),
streams: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_per_segment_streams",
Help: "The number of streams per segment.",
Buckets: prometheus.ExponentialBuckets(1, 2, 10),
NativeHistogramBucketFactor: 1.1,
}),
tenants: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_per_segment_tenants",
Help: "The number of tenants per segment.",
Buckets: prometheus.ExponentialBuckets(1, 2, 10),
NativeHistogramBucketFactor: 1.1,
}),
}
}

type Metrics struct {
SegmentMetrics *SegmentMetrics
ManagerMetrics *ManagerMetrics
}

func NewMetrics(r prometheus.Registerer) *Metrics {
return &Metrics{
ManagerMetrics: NewManagerMetrics(r),
SegmentMetrics: NewSegmentMetrics(r),
}
}
23 changes: 22 additions & 1 deletion pkg/storage/wal/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ type streamID struct {
}

type SegmentWriter struct {
metrics *SegmentMetrics
streams map[streamID]*streamSegment
buf1 encoding.Encbuf
outputSize atomic.Int64
inputSize atomic.Int64
idxWriter *index.Writer
consistencyMtx *sync.RWMutex
Expand Down Expand Up @@ -76,12 +78,13 @@ func (s *streamSegment) WriteTo(w io.Writer) (n int64, err error) {
}

// NewWalSegmentWriter creates a new WalSegmentWriter.
func NewWalSegmentWriter() (*SegmentWriter, error) {
func NewWalSegmentWriter(m *SegmentMetrics) (*SegmentWriter, error) {
idxWriter, err := index.NewWriter()
if err != nil {
return nil, err
}
return &SegmentWriter{
metrics: m,
streams: make(map[streamID]*streamSegment, 64),
buf1: encoding.EncWith(make([]byte, 0, 4)),
idxWriter: idxWriter,
Expand Down Expand Up @@ -144,6 +147,22 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels
}
}

// Observe updates metrics for the writer. If called before WriteTo then the
// output size histogram will observe 0.
func (b *SegmentWriter) Observe() {
b.consistencyMtx.Lock()
defer b.consistencyMtx.Unlock()

b.metrics.streams.Observe(float64(len(b.streams)))
tenants := make(map[string]struct{}, len(b.streams))
for _, s := range b.streams {
tenants[s.tenantID] = struct{}{}
}
b.metrics.tenants.Observe(float64(len(tenants)))
b.metrics.inputSizeBytes.Observe(float64(b.inputSize.Load()))
b.metrics.outputSizeBytes.Observe(float64(b.outputSize.Load()))
}

func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) {
var (
total int64
Expand Down Expand Up @@ -262,6 +281,8 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) {
}
total += int64(n)

b.outputSize.Store(total)

return total, nil
}

Expand Down
Loading

0 comments on commit d4179aa

Please sign in to comment.