Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blooms): limit bloom size during creation #12796

Merged
merged 5 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3333,6 +3333,12 @@ shard_streams:
# CLI flag: -bloom-compactor.max-block-size
[bloom_compactor_max_block_size: <int> | default = 200MB]

# Experimental. The maximum bloom size per log stream. A log stream whose
# generated bloom filter exceeds this size will be discarded. A value of 0 sets
# an unlimited size. Default is 100MB.
# CLI flag: -bloom-compactor.max-bloom-size
[bloom_compactor_max_bloom_size: <int> | default = 100MB]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set the default to the same value as MaxQueryPageSize, which is 64MiB?

Copy link
Member Author

@owen-d owen-d Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can do that 👍

edit: It's kind of a pain to align them to a single source of truth (they also use two different bytesize impls... yuck), so I'm just going to leave it as is. I also like the idea of having it above the default filter level a bit so we can change the read path slightly without reindexing.


# Experimental. Length of the n-grams created when computing blooms from log
# lines.
# CLI flag: -bloom-compactor.ngram-length
Expand Down
4 changes: 4 additions & 0 deletions pkg/bloomcompactor/bloomcompactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ func (m mockLimits) BloomCompactorMaxBlockSize(_ string) int {
panic("implement me")
}

func (m mockLimits) BloomCompactorMaxBloomSize(_ string) int {
panic("implement me")
}

func TestTokenRangesForInstance(t *testing.T) {
desc := func(id int, tokens ...uint32) ring.InstanceDesc {
return ring.InstanceDesc{Id: fmt.Sprintf("%d", id), Tokens: tokens}
Expand Down
1 change: 1 addition & 0 deletions pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,6 @@ type Limits interface {
BloomNGramSkip(tenantID string) int
BloomFalsePositiveRate(tenantID string) float64
BloomCompactorMaxBlockSize(tenantID string) int
BloomCompactorMaxBloomSize(tenantID string) int
BloomBlockEncoding(tenantID string) string
}
3 changes: 2 additions & 1 deletion pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,8 @@ func (s *SimpleBloomController) buildGaps(
nGramSize = uint64(s.limits.BloomNGramLength(tenant))
nGramSkip = uint64(s.limits.BloomNGramSkip(tenant))
maxBlockSize = uint64(s.limits.BloomCompactorMaxBlockSize(tenant))
blockOpts = v1.NewBlockOptions(blockEnc, nGramSize, nGramSkip, maxBlockSize)
maxBloomSize = uint64(s.limits.BloomCompactorMaxBloomSize(tenant))
blockOpts = v1.NewBlockOptions(blockEnc, nGramSize, nGramSkip, maxBlockSize, maxBloomSize)
created []bloomshipper.Meta
totalSeries int
bytesAdded int
Expand Down
21 changes: 13 additions & 8 deletions pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,17 @@ func NewSimpleBloomGenerator(
metrics: metrics,
reporter: reporter,

tokenizer: v1.NewBloomTokenizer(opts.Schema.NGramLen(), opts.Schema.NGramSkip(), metrics.bloomMetrics),
tokenizer: v1.NewBloomTokenizer(
opts.Schema.NGramLen(),
opts.Schema.NGramSkip(),
int(opts.UnencodedBlockOptions.MaxBloomSizeBytes),
metrics.bloomMetrics,
),
}
}

func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Series, bloom *v1.Bloom) (int, error) {
return func(series *v1.Series, bloom *v1.Bloom) (int, error) {
func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Series, bloom *v1.Bloom) (int, bool, error) {
return func(series *v1.Series, bloom *v1.Bloom) (int, bool, error) {
start := time.Now()
level.Debug(s.logger).Log(
"msg", "populating bloom filter",
Expand All @@ -104,10 +109,10 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Se
)
chunkItersWithFP, err := s.chunkLoader.Load(ctx, s.userID, series)
if err != nil {
return 0, errors.Wrapf(err, "failed to load chunks for series: %+v", series)
return 0, false, errors.Wrapf(err, "failed to load chunks for series: %+v", series)
}

bytesAdded, err := s.tokenizer.Populate(
bytesAdded, skip, err := s.tokenizer.Populate(
&v1.SeriesWithBloom{
Series: series,
Bloom: bloom,
Expand All @@ -128,7 +133,7 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Se
if s.reporter != nil {
s.reporter(series.Fingerprint)
}
return bytesAdded, err
return bytesAdded, skip, err
}

}
Expand Down Expand Up @@ -174,7 +179,7 @@ type LazyBlockBuilderIterator struct {
ctx context.Context
opts v1.BlockOptions
metrics *Metrics
populate func(*v1.Series, *v1.Bloom) (int, error)
populate func(*v1.Series, *v1.Bloom) (int, bool, error)
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
series v1.PeekingIterator[*v1.Series]
blocks v1.ResettableIterator[*v1.SeriesWithBloom]
Expand All @@ -188,7 +193,7 @@ func NewLazyBlockBuilderIterator(
ctx context.Context,
opts v1.BlockOptions,
metrics *Metrics,
populate func(*v1.Series, *v1.Bloom) (int, error),
populate func(*v1.Series, *v1.Bloom) (int, bool, error),
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
series v1.PeekingIterator[*v1.Series],
blocks v1.ResettableIterator[*v1.SeriesWithBloom],
Expand Down
8 changes: 4 additions & 4 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,13 @@ func TestSimpleBloomGenerator(t *testing.T) {
}{
{
desc: "SkipsIncompatibleSchemas",
fromSchema: v1.NewBlockOptions(enc, 3, 0, maxBlockSize),
toSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize),
fromSchema: v1.NewBlockOptions(enc, 3, 0, maxBlockSize, 0),
toSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize, 0),
},
{
desc: "CombinesBlocks",
fromSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize),
toSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize),
fromSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize, 0),
toSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize, 0),
},
} {
t.Run(fmt.Sprintf("%s/%s", tc.desc, enc), func(t *testing.T) {
Expand Down
115 changes: 66 additions & 49 deletions pkg/storage/bloom/v1/bloom_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math"
"time"

"github.com/c2h5oh/datasize"
"github.com/go-kit/log/level"
"github.com/pkg/errors"

Expand All @@ -25,6 +26,7 @@ Bloom filters are utilized for faster lookups of log lines.
type BloomTokenizer struct {
metrics *Metrics

maxBloomSize int
lineTokenizer *NGramTokenizer
cache map[string]interface{}
}
Expand All @@ -38,13 +40,14 @@ const eightBits = 8
// 1) The token slices generated must not be mutated externally
// 2) The token slice must not be used after the next call to `Tokens()` as it will repopulate the slice.
// 2) This is not thread safe.
func NewBloomTokenizer(nGramLen, nGramSkip int, metrics *Metrics) *BloomTokenizer {
func NewBloomTokenizer(nGramLen, nGramSkip int, maxBloomSize int, metrics *Metrics) *BloomTokenizer {
// TODO(chaudum): Replace logger
level.Info(util_log.Logger).Log("msg", "create new bloom tokenizer", "ngram length", nGramLen, "ngram skip", nGramSkip)
return &BloomTokenizer{
metrics: metrics,
cache: make(map[string]interface{}, cacheSize),
lineTokenizer: NewNGramTokenizer(nGramLen, nGramSkip),
maxBloomSize: maxBloomSize,
}
}

Expand Down Expand Up @@ -89,7 +92,9 @@ type ChunkRefWithIter struct {
}

// Populate adds the tokens from the given chunks to the given seriesWithBloom.
func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefWithIter]) (int, error) {
// The `skip` return value indicates whether this series should be discarded and is used to short-circuit
// bloom generation for series that are too large. We will undoubtedly improve this in the future.
func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefWithIter]) (bytesAdded int, skip bool, err error) {
startTime := time.Now().UnixMilli()

clearCache(bt.cache)
Expand Down Expand Up @@ -119,61 +124,53 @@ func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefW
tokenBuf, prefixLn = prefixedToken(bt.lineTokenizer.N(), chk.Ref, tokenBuf)

// Iterate over lines in the chunk
entries:
for itr.Next() && itr.Error() == nil {
// TODO(owen-d): rather than iterate over the line twice, once for prefixed tokenizer & once for
// raw tokenizer, we could iterate once and just return (prefix, token) pairs from the tokenizer.
// Double points for them being different-ln references to the same data.
line := itr.Entry().Line
chunkBytes += len(line)
chunkTokenizer := NewPrefixedTokenIter(tokenBuf, prefixLn, bt.lineTokenizer.Tokens(line))
for chunkTokenizer.Next() {
tok := chunkTokenizer.At()
tokens++
// TODO(owen-d): [n]byte this
str := string(tok)
_, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters
if found {
cachedInserts++
continue
}

bt.cache[str] = nil
collision := swb.Bloom.ScalableBloomFilter.TestAndAdd(tok)
if collision {
collisionInserts++
} else {
successfulInserts++
}

if len(bt.cache) >= cacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
}
tokenItrs := []Iterator[[]byte]{
// two iterators, one for the raw tokens and one for the chunk prefixed tokens.
// Warning: the underlying line tokenizer (used in both iterators) uses the same buffer for tokens.
// They are NOT SAFE for concurrent use.
NewPrefixedTokenIter(tokenBuf, prefixLn, bt.lineTokenizer.Tokens(line)),
bt.lineTokenizer.Tokens(line),
}

lineTokenizer := bt.lineTokenizer.Tokens(line)
for lineTokenizer.Next() {
tok := lineTokenizer.At()
tokens++
str := string(tok)
_, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters
if found {
chunkCachedInserts++
continue
for _, itr := range tokenItrs {
for itr.Next() {
tok := itr.At()
tokens++
// TODO(owen-d): [n]byte this
str := string(tok)
_, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters
if found {
cachedInserts++
continue
}

bt.cache[str] = nil
collision, sz := swb.Bloom.ScalableBloomFilter.HeavyAdd(tok)
if collision {
collisionInserts++
} else {
successfulInserts++
}

if bt.maxBloomSize > 0 && sz > bt.maxBloomSize {
skip = true
break entries
}

if len(bt.cache) >= cacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
}
}
bt.cache[str] = nil

collision := swb.Bloom.ScalableBloomFilter.TestAndAdd(tok)
if collision {
chunkCollisionInserts++
} else {
chunkSuccessfulInserts++
}

if len(bt.cache) >= cacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
}
}

}

// add the recorded chunkbytes to the sourcebytes counter in case we return early via error
Expand All @@ -187,7 +184,7 @@ func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefW
es.Add(errors.Wrapf(err, "error iterating chunk: %#v", chk.Ref))
}
if combined := es.Err(); combined != nil {
return sourceBytes, combined
return sourceBytes, skip, combined
}
swb.Series.Chunks = append(swb.Series.Chunks, chk.Ref)

Expand All @@ -200,13 +197,27 @@ func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefW
bt.metrics.insertsTotal.WithLabelValues(tokenTypeChunkPrefixed, collisionTypeCache).Add(float64(chunkCachedInserts))
bt.metrics.insertsTotal.WithLabelValues(tokenTypeChunkPrefixed, collisionTypeTrue).Add(float64(chunkCollisionInserts))
bt.metrics.sourceBytesAdded.Add(float64(chunkBytes))

// Exit early if the series is too large
if skip {
break
}
}

if err := chks.Err(); err != nil {
level.Error(util_log.Logger).Log("msg", "error downloading chunks batch", "err", err)
return sourceBytes, fmt.Errorf("error downloading chunks batch: %w", err)
return sourceBytes, skip, fmt.Errorf("error downloading chunks batch: %w", err)
}

level.Debug(util_log.Logger).Log(
"msg", "bloom filter populated",
"chunks", len(swb.Series.Chunks),
"fp", swb.Series.Fingerprint,
"sourceBytes", datasize.ByteSize(sourceBytes).HumanReadable(),
"bloomSize", datasize.ByteSize(swb.Bloom.Capacity()/8).HumanReadable(),
"skipped", skip,
)

endTime := time.Now().UnixMilli()

fillRatio := swb.Bloom.ScalableBloomFilter.FillRatio()
Expand All @@ -215,8 +226,14 @@ func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefW
float64(estimatedCount(swb.Bloom.ScalableBloomFilter.Capacity(), fillRatio)),
)
bt.metrics.bloomSize.Observe(float64(swb.Bloom.ScalableBloomFilter.Capacity() / eightBits))
bt.metrics.sbfCreationTime.Add(float64(endTime - startTime))
return sourceBytes, nil

ty := bloomCreationTypeIndexed
if skip {
ty = bloomCreationTypeSkipped
}
bt.metrics.sbfCreationTime.WithLabelValues(ty).Add(float64(endTime - startTime))

return sourceBytes, skip, nil
}

// n ≈ −m ln(1 − p).
Expand Down
14 changes: 7 additions & 7 deletions pkg/storage/bloom/v1/bloom_tokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestPrefixedKeyCreation(t *testing.T) {

func TestSetLineTokenizer(t *testing.T) {
t.Parallel()
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, metrics)
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics)

// Validate defaults
require.Equal(t, bt.lineTokenizer.N(), DefaultNGramLength)
Expand All @@ -94,7 +94,7 @@ func TestSetLineTokenizer(t *testing.T) {
func TestTokenizerPopulate(t *testing.T) {
t.Parallel()
var testLine = "this is a log line"
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, metrics)
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics)

sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8)
var lbsList []labels.Labels
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestTokenizerPopulate(t *testing.T) {
Series: &series,
}

_, err = bt.Populate(&swb, NewSliceIter([]ChunkRefWithIter{{Ref: ChunkRef{}, Itr: itr}}))
_, _, err = bt.Populate(&swb, NewSliceIter([]ChunkRefWithIter{{Ref: ChunkRef{}, Itr: itr}}))
require.NoError(t, err)
tokenizer := NewNGramTokenizer(DefaultNGramLength, DefaultNGramSkip)
toks := tokenizer.Tokens(testLine)
Expand All @@ -138,7 +138,7 @@ func TestTokenizerPopulate(t *testing.T) {
func BenchmarkPopulateSeriesWithBloom(b *testing.B) {
for i := 0; i < b.N; i++ {
var testLine = lorem + lorem + lorem
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, metrics)
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics)

sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8)
var lbsList []labels.Labels
Expand Down Expand Up @@ -169,13 +169,13 @@ func BenchmarkPopulateSeriesWithBloom(b *testing.B) {
Series: &series,
}

_, err = bt.Populate(&swb, NewSliceIter([]ChunkRefWithIter{{Ref: ChunkRef{}, Itr: itr}}))
_, _, err = bt.Populate(&swb, NewSliceIter([]ChunkRefWithIter{{Ref: ChunkRef{}, Itr: itr}}))
require.NoError(b, err)
}
}

func BenchmarkMapClear(b *testing.B) {
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, metrics)
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics)
for i := 0; i < b.N; i++ {
for k := 0; k < cacheSize; k++ {
bt.cache[fmt.Sprint(k)] = k
Expand All @@ -186,7 +186,7 @@ func BenchmarkMapClear(b *testing.B) {
}

func BenchmarkNewMap(b *testing.B) {
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, metrics)
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics)
for i := 0; i < b.N; i++ {
for k := 0; k < cacheSize; k++ {
bt.cache[fmt.Sprint(k)] = k
Expand Down
Loading
Loading