From 3ca36fad6c842b3c6a4fe1cf08e72ddb461c0a0a Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 26 Apr 2024 12:59:07 -0700 Subject: [PATCH] fix(blooms): dont break iterator conventions refactors skip logic for bloom pages that are too large s/Seek/LoadOffset/ for LazyBloomIter removes unused code Signed-off-by: Owen Diehl --- pkg/storage/bloom/v1/block.go | 79 ++------------------------- pkg/storage/bloom/v1/bloom.go | 14 +++-- pkg/storage/bloom/v1/bloom_querier.go | 27 +++++---- pkg/storage/bloom/v1/fuse.go | 31 ++++++----- pkg/storage/bloom/v1/fuse_test.go | 33 +++++------ 5 files changed, 62 insertions(+), 122 deletions(-) diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go index 91ba171b272c..511a6da9c88a 100644 --- a/pkg/storage/bloom/v1/block.go +++ b/pkg/storage/bloom/v1/block.go @@ -144,14 +144,12 @@ func (bq *BlockQuerier) Seek(fp model.Fingerprint) error { func (bq *BlockQuerier) Next() bool { for bq.series.Next() { series := bq.series.At() - bq.blooms.Seek(series.Offset) + if ok := bq.blooms.LoadOffset(series.Offset); !ok { + // can't seek to the desired bloom, likely because the page was too large to load + // so we skip this series and move on to the next + continue + } if !bq.blooms.Next() { - // skip blocks that are too large - if errors.Is(bq.blooms.Err(), ErrPageTooLarge) { - // fmt.Printf("skipping bloom page: %s (%d)\n", series.Fingerprint, series.Chunks.Len()) - bq.blooms.err = nil - continue - } return false } bloom := bq.blooms.At() @@ -175,70 +173,3 @@ func (bq *BlockQuerier) Err() error { return bq.blooms.Err() } - -// CheckChunksForSeries checks if the given chunks pass a set of searches in the given bloom block. -// It returns the list of chunks which will need to be downloaded for a query based on the initial list -// passed as the `chks` argument. Chunks will be removed from the result set if they are indexed in the bloom -// and fail to pass all the searches. -func (bq *BlockQuerier) CheckChunksForSeries(fp model.Fingerprint, chks ChunkRefs, searches [][]byte) (ChunkRefs, error) { - schema, err := bq.Schema() - if err != nil { - return chks, fmt.Errorf("getting schema: %w", err) - } - - if err := bq.Seek(fp); err != nil { - return chks, errors.Wrapf(err, "seeking to series for fp: %v", fp) - } - - if !bq.series.Next() { - return chks, nil - } - - series := bq.series.At() - if series.Fingerprint != fp { - return chks, nil - } - - bq.blooms.Seek(series.Offset) - if !bq.blooms.Next() { - return chks, fmt.Errorf("seeking to bloom for fp: %v", fp) - } - - bloom := bq.blooms.At() - - // First, see if the search passes the series level bloom before checking for chunks individually - for _, search := range searches { - if !bloom.Test(search) { - // the entire series bloom didn't pass one of the searches, - // so we can skip checking chunks individually. - // We still return all chunks that are not included in the bloom - // as they may still have the data - return chks.Unless(series.Chunks), nil - } - } - - // TODO(salvacorts): pool tokenBuf - var tokenBuf []byte - var prefixLen int - - // Check chunks individually now - mustCheck, inBlooms := chks.Compare(series.Chunks, true) - -outer: - for _, chk := range inBlooms { - // Get buf to concatenate the chunk and search token - tokenBuf, prefixLen = prefixedToken(schema.NGramLen(), chk, tokenBuf) - for _, search := range searches { - tokenBuf = append(tokenBuf[:prefixLen], search...) - - if !bloom.Test(tokenBuf) { - // chunk didn't pass the search, continue to the next chunk - continue outer - } - } - // chunk passed all searches, add to the list of chunks to download - mustCheck = append(mustCheck, chk) - - } - return mustCheck, nil -} diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go index 1554b6828f24..89279624a965 100644 --- a/pkg/storage/bloom/v1/bloom.go +++ b/pkg/storage/bloom/v1/bloom.go @@ -275,11 +275,13 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) { return checksum, nil } -func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, err error) { +// BloomPageDecoder returns a decoder for the given page index. +// It may skip the page if it's too large. +func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) { if pageIdx < 0 || pageIdx >= len(b.pageHeaders) { metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc() metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen)) - return nil, fmt.Errorf("invalid page (%d) for bloom page decoding", pageIdx) + return nil, false, fmt.Errorf("invalid page (%d) for bloom page decoding", pageIdx) } page := b.pageHeaders[pageIdx] @@ -288,13 +290,13 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize if page.Len > maxPageSize { metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonTooLarge).Inc() metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonTooLarge).Add(float64(page.DecompressedLen)) - return nil, ErrPageTooLarge + return nil, true, ErrPageTooLarge } if _, err = r.Seek(int64(page.Offset), io.SeekStart); err != nil { metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Inc() metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Add(float64(page.DecompressedLen)) - return nil, errors.Wrap(err, "seeking to bloom page") + return nil, false, errors.Wrap(err, "seeking to bloom page") } if b.schema.encoding == chunkenc.EncNone { @@ -306,10 +308,10 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize if err != nil { metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Inc() metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Add(float64(page.DecompressedLen)) - return nil, errors.Wrap(err, "decoding bloom page") + return nil, false, errors.Wrap(err, "decoding bloom page") } metrics.pagesRead.WithLabelValues(pageTypeBloom).Inc() metrics.bytesRead.WithLabelValues(pageTypeBloom).Add(float64(page.DecompressedLen)) - return res, nil + return res, false, nil } diff --git a/pkg/storage/bloom/v1/bloom_querier.go b/pkg/storage/bloom/v1/bloom_querier.go index c539f7c27193..8701cfe4286d 100644 --- a/pkg/storage/bloom/v1/bloom_querier.go +++ b/pkg/storage/bloom/v1/bloom_querier.go @@ -42,14 +42,9 @@ func (it *LazyBloomIter) ensureInit() { } } -func (it *LazyBloomIter) Seek(offset BloomOffset) { +func (it *LazyBloomIter) LoadOffset(offset BloomOffset) (ok bool) { it.ensureInit() - // reset error from any previous seek/next that yield pages too large - if errors.Is(it.err, ErrPageTooLarge) { - it.err = nil - } - // if we need a different page or the current page hasn't been loaded, // load the desired page if it.curPageIndex != offset.Page || it.curPage == nil { @@ -63,12 +58,15 @@ func (it *LazyBloomIter) Seek(offset BloomOffset) { r, err := it.b.reader.Blooms() if err != nil { it.err = errors.Wrap(err, "getting blooms reader") - return + return false + } + decoder, skip, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics) + if skip { + return false } - decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics) if err != nil { it.err = errors.Wrap(err, "loading bloom page") - return + return false } it.curPageIndex = offset.Page @@ -77,6 +75,7 @@ func (it *LazyBloomIter) Seek(offset BloomOffset) { } it.curPage.Seek(offset.ByteOffset) + return true } func (it *LazyBloomIter) Next() bool { @@ -101,17 +100,23 @@ func (it *LazyBloomIter) next() bool { return false } - it.curPage, err = it.b.blooms.BloomPageDecoder( + var skip bool + it.curPage, skip, err = it.b.blooms.BloomPageDecoder( r, it.curPageIndex, it.m, it.b.metrics, ) + if skip { + // this page was skipped; check the next + it.curPageIndex++ + continue + } + // this page wasn't skipped & produced an error, return if err != nil { it.err = err return false } - continue } if !it.curPage.Next() { diff --git a/pkg/storage/bloom/v1/fuse.go b/pkg/storage/bloom/v1/fuse.go index 4ac7e83b3a0b..4da85bc4c5e8 100644 --- a/pkg/storage/bloom/v1/fuse.go +++ b/pkg/storage/bloom/v1/fuse.go @@ -59,6 +59,15 @@ func NewFusedQuerier(bq *BlockQuerier, inputs []PeekingIterator[Request], logger } } +func (fq *FusedQuerier) noRemovals(batch []Request, fp model.Fingerprint) { + for _, input := range batch { + input.Response <- Output{ + Fp: fp, + Removals: nil, + } + } +} + func (fq *FusedQuerier) Run() error { schema, err := fq.bq.Schema() if err != nil { @@ -85,26 +94,22 @@ func (fq *FusedQuerier) Run() error { if series.Fingerprint != fp { // fingerprint not found, can't remove chunks level.Debug(fq.logger).Log("msg", "fingerprint not found", "fp", series.Fingerprint, "err", fq.bq.series.Err()) - for _, input := range nextBatch { - input.Response <- Output{ - Fp: fp, - Removals: nil, - } - } + fq.noRemovals(nextBatch, fp) continue } // Now that we've found the series, we need to find the unpack the bloom - fq.bq.blooms.Seek(series.Offset) + ok := fq.bq.blooms.LoadOffset(series.Offset) + if !ok { + // could not seek to the desired bloom, + // likely because the page was too large to load + fq.noRemovals(nextBatch, fp) + } + if !fq.bq.blooms.Next() { // fingerprint not found, can't remove chunks level.Debug(fq.logger).Log("msg", "fingerprint not found", "fp", series.Fingerprint, "err", fq.bq.blooms.Err()) - for _, input := range nextBatch { - input.Response <- Output{ - Fp: fp, - Removals: nil, - } - } + fq.noRemovals(nextBatch, fp) continue } diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index 81e646d96968..c516a58ebc93 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -152,6 +152,10 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) reader := NewByteReader(indexBuf, bloomsBuf) + largeSeries := func(i int) bool { + return i%2 == 0 + } + numSeries := 4 data := make([]SeriesWithBloom, 0, numSeries) tokenizer := NewNGramTokenizer(4, 0) @@ -170,8 +174,10 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { bloom.ScalableBloomFilter = *filter.NewScalableBloomFilter(1024, 0.01, 0.8) nLines := 10 - if i == 0 || i == 2 { - // Add enough lines to make the bloom page too large for series 1 + // all even series will have a larger bloom (more than 1 filter) + if largeSeries(i) { + // Add enough lines to make the bloom page too large and + // trigger another filter addition nLines = 10000 } @@ -218,14 +224,15 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { series := querier.series.At() require.Equal(t, fp, series.Fingerprint) - querier.blooms.Seek(series.Offset) - - if fp == 0 || fp == 2 { - require.False(t, querier.blooms.Next()) - require.Error(t, querier.blooms.Err()) + seekable := true + if largeSeries(int(fp)) { + seekable = false + } + if !seekable { + require.False(t, querier.blooms.LoadOffset(series.Offset)) continue } - + require.True(t, querier.blooms.LoadOffset(series.Offset)) require.True(t, querier.blooms.Next()) require.NoError(t, querier.blooms.Err()) } @@ -293,16 +300,6 @@ func BenchmarkBlockQuerying(b *testing.B) { // benchmark b.StartTimer() - b.Run("single-pass", func(b *testing.B) { - for i := 0; i < b.N; i++ { - for _, chain := range requestChains { - for _, req := range chain { - _, _ = querier.CheckChunksForSeries(req.Fp, req.Chks, nil) - } - } - } - - }) b.Run("fused", func(b *testing.B) { // spin up some goroutines to consume the responses so they don't block go func() {