From 46e98dbfede6195b3a2c1689a3f1493e00612a48 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 25 Apr 2024 12:01:08 -0700 Subject: [PATCH 1/4] feat(blooms): limit bloom size during creation Signed-off-by: Owen Diehl --- docs/sources/shared/configuration.md | 6 + pkg/bloomcompactor/bloomcompactor_test.go | 4 + pkg/bloomcompactor/config.go | 1 + pkg/bloomcompactor/controller.go | 3 +- pkg/bloomcompactor/spec.go | 21 ++-- pkg/bloomcompactor/spec_test.go | 8 +- pkg/storage/bloom/v1/bloom_tokenizer.go | 114 +++++++++++-------- pkg/storage/bloom/v1/bloom_tokenizer_test.go | 14 +-- pkg/storage/bloom/v1/builder.go | 43 ++++--- pkg/storage/bloom/v1/builder_test.go | 8 +- pkg/storage/bloom/v1/filter/scalable.go | 7 ++ pkg/storage/bloom/v1/metrics.go | 22 ++-- pkg/storage/bloom/v1/tokenizer.go | 2 +- pkg/validation/limits.go | 14 +++ 14 files changed, 171 insertions(+), 96 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 9a04d6fac785..cd46448dd171 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3333,6 +3333,12 @@ shard_streams: # CLI flag: -bloom-compactor.max-block-size [bloom_compactor_max_block_size: | default = 200MB] +# Experimental. The maximum bloom size per log stream. A log stream whose +# generated bloom filter exceeds this size will be discarded. A value of 0 sets +# an unlimited size. Default is 100MB. +# CLI flag: -bloom-compactor.max-bloom-size +[bloom_compactor_max_bloom_size: | default = 100MB] + # Experimental. Length of the n-grams created when computing blooms from log # lines. # CLI flag: -bloom-compactor.ngram-length diff --git a/pkg/bloomcompactor/bloomcompactor_test.go b/pkg/bloomcompactor/bloomcompactor_test.go index 1734ecfa710f..2a82782b722c 100644 --- a/pkg/bloomcompactor/bloomcompactor_test.go +++ b/pkg/bloomcompactor/bloomcompactor_test.go @@ -197,6 +197,10 @@ func (m mockLimits) BloomCompactorMaxBlockSize(_ string) int { panic("implement me") } +func (m mockLimits) BloomCompactorMaxBloomSize(_ string) int { + panic("implement me") +} + func TestTokenRangesForInstance(t *testing.T) { desc := func(id int, tokens ...uint32) ring.InstanceDesc { return ring.InstanceDesc{Id: fmt.Sprintf("%d", id), Tokens: tokens} diff --git a/pkg/bloomcompactor/config.go b/pkg/bloomcompactor/config.go index ca6672d3406b..82daac0eac39 100644 --- a/pkg/bloomcompactor/config.go +++ b/pkg/bloomcompactor/config.go @@ -93,5 +93,6 @@ type Limits interface { BloomNGramSkip(tenantID string) int BloomFalsePositiveRate(tenantID string) float64 BloomCompactorMaxBlockSize(tenantID string) int + BloomCompactorMaxBloomSize(tenantID string) int BloomBlockEncoding(tenantID string) string } diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index e5416c7866cd..c706f9ae72b5 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -352,7 +352,8 @@ func (s *SimpleBloomController) buildGaps( nGramSize = uint64(s.limits.BloomNGramLength(tenant)) nGramSkip = uint64(s.limits.BloomNGramSkip(tenant)) maxBlockSize = uint64(s.limits.BloomCompactorMaxBlockSize(tenant)) - blockOpts = v1.NewBlockOptions(blockEnc, nGramSize, nGramSkip, maxBlockSize) + maxBloomSize = uint64(s.limits.BloomCompactorMaxBloomSize(tenant)) + blockOpts = v1.NewBlockOptions(blockEnc, nGramSize, nGramSkip, maxBlockSize, maxBloomSize) created []bloomshipper.Meta totalSeries int bytesAdded int diff --git a/pkg/bloomcompactor/spec.go b/pkg/bloomcompactor/spec.go index 74b056e07b0a..229efe9c1693 100644 --- a/pkg/bloomcompactor/spec.go +++ b/pkg/bloomcompactor/spec.go @@ -89,12 +89,17 @@ func NewSimpleBloomGenerator( metrics: metrics, reporter: reporter, - tokenizer: v1.NewBloomTokenizer(opts.Schema.NGramLen(), opts.Schema.NGramSkip(), metrics.bloomMetrics), + tokenizer: v1.NewBloomTokenizer( + opts.Schema.NGramLen(), + opts.Schema.NGramSkip(), + int(opts.UnencodedBlockOptions.MaxBloomSizeBytes), + metrics.bloomMetrics, + ), } } -func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Series, bloom *v1.Bloom) (int, error) { - return func(series *v1.Series, bloom *v1.Bloom) (int, error) { +func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Series, bloom *v1.Bloom) (int, bool, error) { + return func(series *v1.Series, bloom *v1.Bloom) (int, bool, error) { start := time.Now() level.Debug(s.logger).Log( "msg", "populating bloom filter", @@ -104,10 +109,10 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Se ) chunkItersWithFP, err := s.chunkLoader.Load(ctx, s.userID, series) if err != nil { - return 0, errors.Wrapf(err, "failed to load chunks for series: %+v", series) + return 0, false, errors.Wrapf(err, "failed to load chunks for series: %+v", series) } - bytesAdded, err := s.tokenizer.Populate( + bytesAdded, skip, err := s.tokenizer.Populate( &v1.SeriesWithBloom{ Series: series, Bloom: bloom, @@ -128,7 +133,7 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Se if s.reporter != nil { s.reporter(series.Fingerprint) } - return bytesAdded, err + return bytesAdded, skip, err } } @@ -174,7 +179,7 @@ type LazyBlockBuilderIterator struct { ctx context.Context opts v1.BlockOptions metrics *Metrics - populate func(*v1.Series, *v1.Bloom) (int, error) + populate func(*v1.Series, *v1.Bloom) (int, bool, error) readWriterFn func() (v1.BlockWriter, v1.BlockReader) series v1.PeekingIterator[*v1.Series] blocks v1.ResettableIterator[*v1.SeriesWithBloom] @@ -188,7 +193,7 @@ func NewLazyBlockBuilderIterator( ctx context.Context, opts v1.BlockOptions, metrics *Metrics, - populate func(*v1.Series, *v1.Bloom) (int, error), + populate func(*v1.Series, *v1.Bloom) (int, bool, error), readWriterFn func() (v1.BlockWriter, v1.BlockReader), series v1.PeekingIterator[*v1.Series], blocks v1.ResettableIterator[*v1.SeriesWithBloom], diff --git a/pkg/bloomcompactor/spec_test.go b/pkg/bloomcompactor/spec_test.go index b35d82b9d3f4..7e39b8dec57f 100644 --- a/pkg/bloomcompactor/spec_test.go +++ b/pkg/bloomcompactor/spec_test.go @@ -121,13 +121,13 @@ func TestSimpleBloomGenerator(t *testing.T) { }{ { desc: "SkipsIncompatibleSchemas", - fromSchema: v1.NewBlockOptions(enc, 3, 0, maxBlockSize), - toSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize), + fromSchema: v1.NewBlockOptions(enc, 3, 0, maxBlockSize, 0), + toSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize, 0), }, { desc: "CombinesBlocks", - fromSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize), - toSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize), + fromSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize, 0), + toSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize, 0), }, } { t.Run(fmt.Sprintf("%s/%s", tc.desc, enc), func(t *testing.T) { diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index 97dd6b3d0baa..f5b160d7616d 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -5,6 +5,7 @@ import ( "math" "time" + "github.com/c2h5oh/datasize" "github.com/go-kit/log/level" "github.com/pkg/errors" @@ -25,6 +26,7 @@ Bloom filters are utilized for faster lookups of log lines. type BloomTokenizer struct { metrics *Metrics + maxBloomSize int lineTokenizer *NGramTokenizer cache map[string]interface{} } @@ -38,7 +40,7 @@ const eightBits = 8 // 1) The token slices generated must not be mutated externally // 2) The token slice must not be used after the next call to `Tokens()` as it will repopulate the slice. // 2) This is not thread safe. -func NewBloomTokenizer(nGramLen, nGramSkip int, metrics *Metrics) *BloomTokenizer { +func NewBloomTokenizer(nGramLen, nGramSkip int, maxBloomSize int, metrics *Metrics) *BloomTokenizer { // TODO(chaudum): Replace logger level.Info(util_log.Logger).Log("msg", "create new bloom tokenizer", "ngram length", nGramLen, "ngram skip", nGramSkip) return &BloomTokenizer{ @@ -89,7 +91,9 @@ type ChunkRefWithIter struct { } // Populate adds the tokens from the given chunks to the given seriesWithBloom. -func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefWithIter]) (int, error) { +// The `skip` return value indicates whether this series should be discarded and is used to short-circuit +// bloom generation for series that are too large. We will undoubtedly improve this in the future. +func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefWithIter]) (bytesAdded int, skip bool, err error) { startTime := time.Now().UnixMilli() clearCache(bt.cache) @@ -119,61 +123,53 @@ func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefW tokenBuf, prefixLn = prefixedToken(bt.lineTokenizer.N(), chk.Ref, tokenBuf) // Iterate over lines in the chunk + entries: for itr.Next() && itr.Error() == nil { // TODO(owen-d): rather than iterate over the line twice, once for prefixed tokenizer & once for // raw tokenizer, we could iterate once and just return (prefix, token) pairs from the tokenizer. // Double points for them being different-ln references to the same data. line := itr.Entry().Line chunkBytes += len(line) - chunkTokenizer := NewPrefixedTokenIter(tokenBuf, prefixLn, bt.lineTokenizer.Tokens(line)) - for chunkTokenizer.Next() { - tok := chunkTokenizer.At() - tokens++ - // TODO(owen-d): [n]byte this - str := string(tok) - _, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters - if found { - cachedInserts++ - continue - } - - bt.cache[str] = nil - collision := swb.Bloom.ScalableBloomFilter.TestAndAdd(tok) - if collision { - collisionInserts++ - } else { - successfulInserts++ - } - if len(bt.cache) >= cacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other - clearCache(bt.cache) - } + tokenItrs := []Iterator[[]byte]{ + // two iterators, one for the raw tokens and one for the chunk prefixed tokens. + // Warning: the underlying line tokenizer (used in both iterators) uses the same buffer for tokens. + // They are NOT SAFE for concurrent use. + NewPrefixedTokenIter(tokenBuf, prefixLn, bt.lineTokenizer.Tokens(line)), + bt.lineTokenizer.Tokens(line), } - lineTokenizer := bt.lineTokenizer.Tokens(line) - for lineTokenizer.Next() { - tok := lineTokenizer.At() - tokens++ - str := string(tok) - _, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters - if found { - chunkCachedInserts++ - continue + for _, itr := range tokenItrs { + for itr.Next() { + tok := itr.At() + tokens++ + // TODO(owen-d): [n]byte this + str := string(tok) + _, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters + if found { + cachedInserts++ + continue + } + + bt.cache[str] = nil + collision, sz := swb.Bloom.ScalableBloomFilter.HeavyAdd(tok) + if collision { + collisionInserts++ + } else { + successfulInserts++ + } + + if bt.maxBloomSize > 0 && sz > bt.maxBloomSize { + skip = true + break entries + } + + if len(bt.cache) >= cacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other + clearCache(bt.cache) + } } - bt.cache[str] = nil - collision := swb.Bloom.ScalableBloomFilter.TestAndAdd(tok) - if collision { - chunkCollisionInserts++ - } else { - chunkSuccessfulInserts++ - } - - if len(bt.cache) >= cacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other - clearCache(bt.cache) - } } - } // add the recorded chunkbytes to the sourcebytes counter in case we return early via error @@ -187,7 +183,7 @@ func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefW es.Add(errors.Wrapf(err, "error iterating chunk: %#v", chk.Ref)) } if combined := es.Err(); combined != nil { - return sourceBytes, combined + return sourceBytes, skip, combined } swb.Series.Chunks = append(swb.Series.Chunks, chk.Ref) @@ -200,13 +196,27 @@ func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefW bt.metrics.insertsTotal.WithLabelValues(tokenTypeChunkPrefixed, collisionTypeCache).Add(float64(chunkCachedInserts)) bt.metrics.insertsTotal.WithLabelValues(tokenTypeChunkPrefixed, collisionTypeTrue).Add(float64(chunkCollisionInserts)) bt.metrics.sourceBytesAdded.Add(float64(chunkBytes)) + + // Exit early if the series is too large + if skip { + break + } } if err := chks.Err(); err != nil { level.Error(util_log.Logger).Log("msg", "error downloading chunks batch", "err", err) - return sourceBytes, fmt.Errorf("error downloading chunks batch: %w", err) + return sourceBytes, skip, fmt.Errorf("error downloading chunks batch: %w", err) } + level.Debug(util_log.Logger).Log( + "msg", "bloom filter populated", + "chunks", len(swb.Series.Chunks), + "fp", swb.Series.Fingerprint, + "sourceBytes", datasize.ByteSize(sourceBytes).HumanReadable(), + "bloomSize", datasize.ByteSize(swb.Bloom.Capacity()/8).HumanReadable(), + "skipped", skip, + ) + endTime := time.Now().UnixMilli() fillRatio := swb.Bloom.ScalableBloomFilter.FillRatio() @@ -215,8 +225,14 @@ func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefW float64(estimatedCount(swb.Bloom.ScalableBloomFilter.Capacity(), fillRatio)), ) bt.metrics.bloomSize.Observe(float64(swb.Bloom.ScalableBloomFilter.Capacity() / eightBits)) - bt.metrics.sbfCreationTime.Add(float64(endTime - startTime)) - return sourceBytes, nil + + ty := bloomCreationTypeIndexed + if skip { + ty = bloomCreationTypeSkipped + } + bt.metrics.sbfCreationTime.WithLabelValues(ty).Add(float64(endTime - startTime)) + + return sourceBytes, skip, nil } // n ≈ −m ln(1 − p). diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go index 7d429ea8acb1..dec23f91e80b 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer_test.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go @@ -79,7 +79,7 @@ func TestPrefixedKeyCreation(t *testing.T) { func TestSetLineTokenizer(t *testing.T) { t.Parallel() - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, metrics) + bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics) // Validate defaults require.Equal(t, bt.lineTokenizer.N(), DefaultNGramLength) @@ -94,7 +94,7 @@ func TestSetLineTokenizer(t *testing.T) { func TestTokenizerPopulate(t *testing.T) { t.Parallel() var testLine = "this is a log line" - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, metrics) + bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics) sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8) var lbsList []labels.Labels @@ -125,7 +125,7 @@ func TestTokenizerPopulate(t *testing.T) { Series: &series, } - _, err = bt.Populate(&swb, NewSliceIter([]ChunkRefWithIter{{Ref: ChunkRef{}, Itr: itr}})) + _, _, err = bt.Populate(&swb, NewSliceIter([]ChunkRefWithIter{{Ref: ChunkRef{}, Itr: itr}})) require.NoError(t, err) tokenizer := NewNGramTokenizer(DefaultNGramLength, DefaultNGramSkip) toks := tokenizer.Tokens(testLine) @@ -138,7 +138,7 @@ func TestTokenizerPopulate(t *testing.T) { func BenchmarkPopulateSeriesWithBloom(b *testing.B) { for i := 0; i < b.N; i++ { var testLine = lorem + lorem + lorem - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, metrics) + bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics) sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8) var lbsList []labels.Labels @@ -169,13 +169,13 @@ func BenchmarkPopulateSeriesWithBloom(b *testing.B) { Series: &series, } - _, err = bt.Populate(&swb, NewSliceIter([]ChunkRefWithIter{{Ref: ChunkRef{}, Itr: itr}})) + _, _, err = bt.Populate(&swb, NewSliceIter([]ChunkRefWithIter{{Ref: ChunkRef{}, Itr: itr}})) require.NoError(b, err) } } func BenchmarkMapClear(b *testing.B) { - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, metrics) + bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics) for i := 0; i < b.N; i++ { for k := 0; k < cacheSize; k++ { bt.cache[fmt.Sprint(k)] = k @@ -186,7 +186,7 @@ func BenchmarkMapClear(b *testing.B) { } func BenchmarkNewMap(b *testing.B) { - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, metrics) + bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics) for i := 0; i < b.N; i++ { for k := 0; k < cacheSize; k++ { bt.cache[fmt.Sprint(k)] = k diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index f09a7bc31744..323da86b67c3 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -14,9 +14,10 @@ import ( "github.com/grafana/loki/v3/pkg/util/encoding" ) -var ( - DefaultBlockOptions = NewBlockOptions(0, 4, 1, 50<<20) // EncNone, 50MB -) +// Options for the block which are not encoded into it iself. +type UnencodedBlockOptions struct { + MaxBloomSizeBytes uint64 +} type BlockOptions struct { // Schema determines the Schema of the block and cannot be changed @@ -31,6 +32,11 @@ type BlockOptions struct { // target size in bytes (decompressed) // of each page type SeriesPageSize, BloomPageSize, BlockSize uint64 + + // UnencodedBlockOptions are not encoded into the block's binary format, + // but are a helpful way to pass additional options to the block builder. + // Thus, they're used during construction but not on reads. + UnencodedBlockOptions UnencodedBlockOptions } func (b BlockOptions) Len() int { @@ -70,14 +76,15 @@ type BlockBuilder struct { blooms *BloomBlockBuilder } -func NewBlockOptions(enc chunkenc.Encoding, NGramLength, NGramSkip, MaxBlockSizeBytes uint64) BlockOptions { +func NewBlockOptions(enc chunkenc.Encoding, nGramLength, nGramSkip, maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions { opts := NewBlockOptionsFromSchema(Schema{ version: byte(1), encoding: enc, - nGramLength: NGramLength, - nGramSkip: NGramSkip, + nGramLength: nGramLength, + nGramSkip: nGramSkip, }) - opts.BlockSize = MaxBlockSizeBytes + opts.BlockSize = maxBlockSizeBytes + opts.UnencodedBlockOptions.MaxBloomSizeBytes = maxBloomSizeBytes return opts } @@ -526,7 +533,7 @@ type MergeBuilder struct { // store store Iterator[*Series] // Add chunks to a bloom - populate func(*Series, *Bloom) (int, error) + populate func(*Series, *Bloom) (sourceBytesAdded int, skipSeries bool, err error) metrics *Metrics } @@ -537,7 +544,7 @@ type MergeBuilder struct { func NewMergeBuilder( blocks Iterator[*SeriesWithBloom], store Iterator[*Series], - populate func(*Series, *Bloom) (int, error), + populate func(*Series, *Bloom) (int, bool, error), metrics *Metrics, ) *MergeBuilder { return &MergeBuilder{ @@ -613,8 +620,15 @@ func (mb *MergeBuilder) processNextSeries( chunksIndexed += len(chunksToAdd) + var ( + err error + skip bool + done bool + sourceBytes int + ) + if len(chunksToAdd) > 0 { - sourceBytes, err := mb.populate( + sourceBytes, skip, err = mb.populate( &Series{ Fingerprint: nextInStore.Fingerprint, Chunks: chunksToAdd, @@ -628,10 +642,13 @@ func (mb *MergeBuilder) processNextSeries( } } - done, err := builder.AddSeries(*cur) - if err != nil { - return nil, bytesAdded, false, false, errors.Wrap(err, "adding series to block") + if !skip { + done, err = builder.AddSeries(*cur) + if err != nil { + return nil, bytesAdded, false, false, errors.Wrap(err, "adding series to block") + } } + return nextInBlocks, bytesAdded, blocksFinished, done, nil } diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 204614403249..56d03cbd7c93 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -222,8 +222,8 @@ func TestMergeBuilder(t *testing.T) { } // We're not testing the ability to extend a bloom in this test - pop := func(_ *Series, _ *Bloom) (int, error) { - return 0, errors.New("not implemented") + pop := func(_ *Series, _ *Bloom) (int, bool, error) { + return 0, false, errors.New("not implemented") } // storage should contain references to all the series we ingested, @@ -408,9 +408,9 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { mb := NewMergeBuilder( dedupedBlocks(blocks), dedupedStore, - func(s *Series, b *Bloom) (int, error) { + func(s *Series, b *Bloom) (int, bool, error) { // We're not actually indexing new data in this test - return 0, nil + return 0, false, nil }, NewMetrics(nil), ) diff --git a/pkg/storage/bloom/v1/filter/scalable.go b/pkg/storage/bloom/v1/filter/scalable.go index 74db6748c7bc..a7848a2dd62c 100644 --- a/pkg/storage/bloom/v1/filter/scalable.go +++ b/pkg/storage/bloom/v1/filter/scalable.go @@ -180,6 +180,13 @@ func (s *ScalableBloomFilter) TestAndAdd(data []byte) bool { return member } +// HeavyAdd adds a new element to the filter and returns a few metrics (the "heavy" part) +func (s *ScalableBloomFilter) HeavyAdd(data []byte) (noop bool, bloomSize int) { + noop = s.TestAndAdd(data) + sz := s.Capacity() / 8 // convert bits to bytes + return noop, int(sz) +} + // Reset restores the Bloom filter to its original state. It returns the filter // to allow for chaining. func (s *ScalableBloomFilter) Reset() *ScalableBloomFilter { diff --git a/pkg/storage/bloom/v1/metrics.go b/pkg/storage/bloom/v1/metrics.go index 22e315e00175..f39eb7dd036f 100644 --- a/pkg/storage/bloom/v1/metrics.go +++ b/pkg/storage/bloom/v1/metrics.go @@ -8,20 +8,21 @@ import ( ) type Metrics struct { - sbfCreationTime prometheus.Counter // time spent creating sbfs - bloomSize prometheus.Histogram // size of the bloom filter in bytes - hammingWeightRatio prometheus.Histogram // ratio of the hamming weight of the bloom filter to the number of bits in the bloom filter - estimatedCount prometheus.Histogram // estimated number of elements in the bloom filter + // writes + sbfCreationTime *prometheus.CounterVec // time spent creating sbfs + bloomSize prometheus.Histogram // size of the bloom filter in bytes + hammingWeightRatio prometheus.Histogram // ratio of the hamming weight of the bloom filter to the number of bits in the bloom filter + estimatedCount prometheus.Histogram // estimated number of elements in the bloom filter chunksIndexed *prometheus.CounterVec chunksPerSeries prometheus.Histogram blockSeriesIterated prometheus.Counter tokensTotal prometheus.Counter insertsTotal *prometheus.CounterVec sourceBytesAdded prometheus.Counter + blockSize prometheus.Histogram + blockFlushReason *prometheus.CounterVec - blockSize prometheus.Histogram - blockFlushReason *prometheus.CounterVec - + // reads pagesRead *prometheus.CounterVec pagesSkipped *prometheus.CounterVec bytesRead *prometheus.CounterVec @@ -47,15 +48,18 @@ const ( skipReasonTooLarge = "too_large" skipReasonErr = "err" skipReasonOOB = "out_of_bounds" + + bloomCreationTypeIndexed = "indexed" + bloomCreationTypeSkipped = "skipped" ) func NewMetrics(r prometheus.Registerer) *Metrics { return &Metrics{ - sbfCreationTime: promauto.With(r).NewCounter(prometheus.CounterOpts{ + sbfCreationTime: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, Name: "bloom_creation_time_total", Help: "Time spent creating scalable bloom filters", - }), + }, []string{"type"}), bloomSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ Namespace: constants.Loki, Name: "bloom_size", diff --git a/pkg/storage/bloom/v1/tokenizer.go b/pkg/storage/bloom/v1/tokenizer.go index 4bad69771265..131a1d057edf 100644 --- a/pkg/storage/bloom/v1/tokenizer.go +++ b/pkg/storage/bloom/v1/tokenizer.go @@ -48,7 +48,7 @@ func NewNGramTokenizer(n, skip int) *NGramTokenizer { return t } -// Token implementsthe NGramBuilder interface +// Token implements the NGramBuilder interface // The Token iterator uses shared buffers for performance. The []byte returned by At() // is not safe for use after subsequent calls to Next() func (t *NGramTokenizer) Tokens(line string) Iterator[[]byte] { diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 1b82716ce732..77ae55178ea1 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -60,6 +60,7 @@ const ( defaultMaxStructuredMetadataSize = "64kb" defaultMaxStructuredMetadataCount = 128 defaultBloomCompactorMaxBlockSize = "200MB" + defaultBloomCompactorMaxBloomSize = "100MB" ) // Limits describe all the limits for users; can be used to describe global default @@ -201,6 +202,7 @@ type Limits struct { BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size" category:"experimental"` BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction" category:"experimental"` BloomCompactorMaxBlockSize flagext.ByteSize `yaml:"bloom_compactor_max_block_size" json:"bloom_compactor_max_block_size" category:"experimental"` + BloomCompactorMaxBloomSize flagext.ByteSize `yaml:"bloom_compactor_max_bloom_size" json:"bloom_compactor_max_bloom_size" category:"experimental"` BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length" category:"experimental"` BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip" category:"experimental"` @@ -376,6 +378,14 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { ), ) + _ = l.BloomCompactorMaxBloomSize.Set(defaultBloomCompactorMaxBloomSize) + f.Var(&l.BloomCompactorMaxBloomSize, "bloom-compactor.max-bloom-size", + fmt.Sprintf( + "Experimental. The maximum bloom size per log stream. A log stream whose generated bloom filter exceeds this size will be discarded. A value of 0 sets an unlimited size. Default is %s.", + defaultBloomCompactorMaxBloomSize, + ), + ) + l.ShardStreams = &shardstreams.Config{} l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f) @@ -966,6 +976,10 @@ func (o *Overrides) BloomCompactorMaxBlockSize(userID string) int { return o.getOverridesForUser(userID).BloomCompactorMaxBlockSize.Val() } +func (o *Overrides) BloomCompactorMaxBloomSize(userID string) int { + return o.getOverridesForUser(userID).BloomCompactorMaxBloomSize.Val() +} + func (o *Overrides) BloomFalsePositiveRate(userID string) float64 { return o.getOverridesForUser(userID).BloomFalsePositiveRate } From 798a3ce3136c121755faae4e5df7f0d237b939bd Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 25 Apr 2024 12:19:34 -0700 Subject: [PATCH 2/4] use maxbloomsize Signed-off-by: Owen Diehl --- pkg/storage/bloom/v1/bloom_tokenizer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index f5b160d7616d..374f3c857f78 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -47,6 +47,7 @@ func NewBloomTokenizer(nGramLen, nGramSkip int, maxBloomSize int, metrics *Metri metrics: metrics, cache: make(map[string]interface{}, cacheSize), lineTokenizer: NewNGramTokenizer(nGramLen, nGramSkip), + maxBloomSize: maxBloomSize, } } From e9efe6ee38760776cb92f46d5640272698bebf55 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 25 Apr 2024 13:36:57 -0700 Subject: [PATCH 3/4] blooms_created_total metric Signed-off-by: Owen Diehl --- pkg/storage/bloom/v1/bloom_tokenizer.go | 1 + pkg/storage/bloom/v1/metrics.go | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index 374f3c857f78..606f4a84dc3c 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -232,6 +232,7 @@ func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefW ty = bloomCreationTypeSkipped } bt.metrics.sbfCreationTime.WithLabelValues(ty).Add(float64(endTime - startTime)) + bt.metrics.bloomsTotal.WithLabelValues(ty).Inc() return sourceBytes, skip, nil } diff --git a/pkg/storage/bloom/v1/metrics.go b/pkg/storage/bloom/v1/metrics.go index f39eb7dd036f..700acfc05c67 100644 --- a/pkg/storage/bloom/v1/metrics.go +++ b/pkg/storage/bloom/v1/metrics.go @@ -9,6 +9,7 @@ import ( type Metrics struct { // writes + bloomsTotal *prometheus.CounterVec // number of blooms created sbfCreationTime *prometheus.CounterVec // time spent creating sbfs bloomSize prometheus.Histogram // size of the bloom filter in bytes hammingWeightRatio prometheus.Histogram // ratio of the hamming weight of the bloom filter to the number of bits in the bloom filter @@ -55,6 +56,11 @@ const ( func NewMetrics(r prometheus.Registerer) *Metrics { return &Metrics{ + bloomsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "blooms_created_total", + Help: "Number of blooms created", + }, []string{"type"}), sbfCreationTime: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, Name: "bloom_creation_time_total", From 18c668e63562f2e18bc266d52c5b489c1fdec3c4 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 25 Apr 2024 13:48:36 -0700 Subject: [PATCH 4/4] bump max bloom size to 128MB, above a common 102.2mb threshold Signed-off-by: Owen Diehl --- docs/sources/shared/configuration.md | 4 ++-- pkg/validation/limits.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index cd46448dd171..ab440d5d4f2f 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3335,9 +3335,9 @@ shard_streams: # Experimental. The maximum bloom size per log stream. A log stream whose # generated bloom filter exceeds this size will be discarded. A value of 0 sets -# an unlimited size. Default is 100MB. +# an unlimited size. Default is 128MB. # CLI flag: -bloom-compactor.max-bloom-size -[bloom_compactor_max_bloom_size: | default = 100MB] +[bloom_compactor_max_bloom_size: | default = 128MB] # Experimental. Length of the n-grams created when computing blooms from log # lines. diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 77ae55178ea1..9a6f6a1a8841 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -60,7 +60,7 @@ const ( defaultMaxStructuredMetadataSize = "64kb" defaultMaxStructuredMetadataCount = 128 defaultBloomCompactorMaxBlockSize = "200MB" - defaultBloomCompactorMaxBloomSize = "100MB" + defaultBloomCompactorMaxBloomSize = "128MB" ) // Limits describe all the limits for users; can be used to describe global default