Skip to content

Commit

Permalink
Parquet page caching (#3196)
Browse files Browse the repository at this point in the history
* page caching first pass

Signed-off-by: Joe Elliott <number101010@gmail.com>

* set default vals

Signed-off-by: Joe Elliott <number101010@gmail.com>

* actually return cache

Signed-off-by: Joe Elliott <number101010@gmail.com>

* disallow none cache

Signed-off-by: Joe Elliott <number101010@gmail.com>

* restore buffered reader

Signed-off-by: Joe Elliott <number101010@gmail.com>

* fix read buffer size

Signed-off-by: Joe Elliott <number101010@gmail.com>

* remove load

Signed-off-by: Joe Elliott <number101010@gmail.com>

* changelog

Signed-off-by: Joe Elliott <number101010@gmail.com>

* fix stopping behavior

Signed-off-by: Joe Elliott <number101010@gmail.com>

* copy before storing parquet pages

Signed-off-by: Joe Elliott <number101010@gmail.com>

* combined cached reader and optimized reader

Signed-off-by: Joe Elliott <number101010@gmail.com>

* lint/asserts

Signed-off-by: Joe Elliott <number101010@gmail.com>

* simplify needsCopy

Signed-off-by: Joe Elliott <number101010@gmail.com>

* add parquet-pages to docs

Signed-off-by: Joe Elliott <number101010@gmail.com>

* add stoewer file?

Signed-off-by: Joe Elliott <number101010@gmail.com>

* perhaps this incantation will work, mused the lonely wizard

Signed-off-by: Joe Elliott <number101010@gmail.com>

---------

Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott committed Dec 15, 2023
1 parent 8d0ddfd commit efb11ea
Show file tree
Hide file tree
Showing 14 changed files with 250 additions and 127 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ jobs:
- name: Lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.53.3
version: v1.55.2
only-new-issues: true

unit-tests-pkg:
name: Test packages - pkg
Expand Down
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
* [ENHANCEMENT] Improve TraceQL regex performance in certain queries. [#3139](https://github.com/grafana/tempo/pull/3139) (@joe-elliott)
* [ENHANCEMENT] Improve TraceQL performance in complex queries. [#3113](https://github.com/grafana/tempo/pull/3113) (@joe-elliott)
* [ENHANCEMENT] Added a `frontend-search` cache role for job search caching. [#3225](https://github.com/grafana/tempo/pull/3225) (@joe-elliott)
* [ENHANCEMENT] Added a `parquet-page` cache role for page level caching. [#3196](https://github.com/grafana/tempo/pull/3196) (@joe-elliott)
* [ENHANCEMENT] Update opentelemetry-collector-contrib dependency to the latest version, v0.89.0 [#3148](https://github.com/grafana/tempo/pull/3148) (@gebn)
* [BUGFIX] Prevent building parquet iterators that would loop forever. [#3159](https://github.com/grafana/tempo/pull/3159) (@mapno)
* [BUGFIX] Sanitize name in mapped dimensions in span-metrics processor [#3171](https://github.com/grafana/tempo/pull/3171) (@mapno)
* [ENHANCEMENT] Update opentelemetry-collector-contrib dependency to the latest version, v0.89.0 [#3148](https://github.com/grafana/tempo/pull/3148) (@gebn)
* [BUGFIX] Fixed an issue where cached footers were requested then ignored. [#3196](https://github.com/grafana/tempo/pull/3196) (@joe-elliott)

## v2.3.1 / 2023-11-28

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ lint:

.PHONY: docker-component # Not intended to be used directly
docker-component: check-component exe
docker build -t grafana/$(COMPONENT) --build-arg=TARGETARCH=$(GOARCH) -f ./cmd/$(COMPONENT)/Dockerfile .
docker build -t grafana/$(COMPONENT) --load --build-arg=TARGETARCH=$(GOARCH) -f ./cmd/$(COMPONENT)/Dockerfile .
docker tag grafana/$(COMPONENT) $(COMPONENT)

.PHONY: docker-component-debug
Expand Down
1 change: 1 addition & 0 deletions cmd/tempo/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
c.Compactor.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "compactor"), f)
c.StorageConfig.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "storage"), f)
c.UsageReport.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "reporting"), f)
c.CacheProvider.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "cache"), f)
}

// MultitenancyIsEnabled checks if multitenancy is enabled
Expand Down
7 changes: 4 additions & 3 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1499,9 +1499,10 @@ cache:
# every cache must have at least one role.
# Allowed values:
# bloom - Bloom filters for trace id lookup.
# parquet-footer - Parquet footer values. Useful for search and trace by id lookup.
# parquet-column-idx - Parquet column index values. Useful for search and trace by id lookup.
# parquet-offset-idx - Parquet offset index values. Useful for search and trace by id lookup.
# parquet-footer - Parquet footer values. Useful for search and trace by id lookup.
# parquet-column-idx - Parquet column index values. Useful for search and trace by id lookup.
# parquet-offset-idx - Parquet offset index values. Useful for search and trace by id lookup.
# parquet-page - Parquet "pages". WARNING: This will attempt to cache most reads from parquet and, as a result, is very high volume.
# frontend-search - Frontend search job results.
- roles:
Expand Down
9 changes: 9 additions & 0 deletions modules/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,16 @@ func (p *provider) starting(_ context.Context) error {
}

func (p *provider) stopping(_ error) error {
// we can only stop a cache once (or they panic). use this map
// to track which caches we've stopped.
stopped := map[cache.Cache]struct{}{}

for _, c := range p.caches {
if _, ok := stopped[c]; ok {
continue
}

stopped[c] = struct{}{}
c.Stop()
}

Expand Down
12 changes: 12 additions & 0 deletions modules/cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"errors"
"flag"
"fmt"
"strings"

Expand Down Expand Up @@ -41,6 +42,10 @@ func (cfg *Config) Validate() error {

// check that all roles are unique
for _, role := range cacheCfg.Role {
if role == cache.RoleNone {
return fmt.Errorf("role none is not a valid role")
}

if _, ok := allRoles[role]; !ok {
return fmt.Errorf("role %s is not a valid role", role)
}
Expand All @@ -56,6 +61,12 @@ func (cfg *Config) Validate() error {
return nil
}

func (cfg *Config) RegisterFlagsAndApplyDefaults(_ string, _ *flag.FlagSet) {
cfg.Background = &cache.BackgroundConfig{}
cfg.Background.WriteBackBuffer = 10000
cfg.Background.WriteBackGoroutines = 10
}

// Name returns a string representation of the roles claimed by this cache.
func (cfg *CacheConfig) Name() string {
stringRoles := make([]string, len(cfg.Role))
Expand All @@ -73,6 +84,7 @@ func allRoles() map[cache.Role]struct{} {
cache.RoleParquetOffsetIdx,
cache.RoleTraceIDIdx,
cache.RoleFrontendSearch,
cache.RoleParquetPage,
}

roles := map[cache.Role]struct{}{}
Expand Down
12 changes: 12 additions & 0 deletions modules/cache/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ func TestConfigValidation(t *testing.T) {
},
expected: errors.New("configured caches require a valid role"),
},
{
name: "invalid - none",
cfg: &Config{
Caches: []CacheConfig{
{
Role: []cache.Role{cache.RoleNone},
MemcachedConfig: &memcached.Config{},
},
},
},
expected: errors.New("role none is not a valid role"),
},
{
name: "invalid - both caches configged",
cfg: &Config{
Expand Down
2 changes: 2 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ type Role string

const (
// individual roles
RoleNone Role = "none"
RoleBloom Role = "bloom"
RoleTraceIDIdx Role = "trace-id-index"
RoleParquetFooter Role = "parquet-footer"
RoleParquetColumnIdx Role = "parquet-column-idx"
RoleParquetOffsetIdx Role = "parquet-offset-idx"
RoleFrontendSearch Role = "frontend-search"
RoleParquetPage Role = "parquet-page"
)

// Provider is an object that can return a cache for a requested role
Expand Down
38 changes: 24 additions & 14 deletions tempodb/backend/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type readerWriter struct {
columnIdxCache cache.Cache
offsetIdxCache cache.Cache
traceIDIdxCache cache.Cache
pageCache cache.Cache
}

func NewCache(cfgBloom *BloomConfig, nextReader backend.RawReader, nextWriter backend.RawWriter, cacheProvider cache.Provider, logger log.Logger) (backend.RawReader, backend.RawWriter, error) {
Expand All @@ -44,6 +45,7 @@ func NewCache(cfgBloom *BloomConfig, nextReader backend.RawReader, nextWriter ba
offsetIdxCache: cacheProvider.CacheFor(cache.RoleParquetOffsetIdx),
columnIdxCache: cacheProvider.CacheFor(cache.RoleParquetColumnIdx),
traceIDIdxCache: cacheProvider.CacheFor(cache.RoleTraceIDIdx),
pageCache: cacheProvider.CacheFor(cache.RoleParquetPage),

nextReader: nextReader,
nextWriter: nextWriter,
Expand All @@ -55,6 +57,7 @@ func NewCache(cfgBloom *BloomConfig, nextReader backend.RawReader, nextWriter ba
"offset_idx", rw.offsetIdxCache != nil,
"column_idx", rw.columnIdxCache != nil,
"trace_id_idx", rw.traceIDIdxCache != nil,
"page", rw.pageCache != nil,
)

return rw, rw, nil
Expand Down Expand Up @@ -91,7 +94,7 @@ func (r *readerWriter) Read(ctx context.Context, name string, keypath backend.Ke

b, err := tempo_io.ReadAllWithEstimate(object, size)
if err == nil && cache != nil {
cache.Store(ctx, []string{k}, [][]byte{b})
store(ctx, cache, cacheInfo.Role, k, b)
}

return io.NopCloser(bytes.NewReader(b)), size, err
Expand All @@ -109,14 +112,15 @@ func (r *readerWriter) ReadRange(ctx context.Context, name string, keypath backe
found, vals, _ := cache.Fetch(ctx, []string{k})
if len(found) > 0 {
copy(buffer, vals[0])
return nil
}
}

// previous implemenation always passed false forward for "shouldCache" so we are matching that behavior by passing nil for cacheInfo
// todo: reevaluate. should we pass the cacheInfo forward?
err := r.nextReader.ReadRange(ctx, name, keypath, offset, buffer, nil)
if err == nil && cache != nil {
cache.Store(ctx, []string{k}, [][]byte{buffer})
store(ctx, cache, cacheInfo.Role, k, buffer)
}

return err
Expand All @@ -125,18 +129,6 @@ func (r *readerWriter) ReadRange(ctx context.Context, name string, keypath backe
// Shutdown implements backend.RawReader
func (r *readerWriter) Shutdown() {
r.nextReader.Shutdown()

stopCache := func(c cache.Cache) {
if c != nil {
c.Stop()
}
}

stopCache(r.footerCache)
stopCache(r.bloomCache)
stopCache(r.offsetIdxCache)
stopCache(r.columnIdxCache)
stopCache(r.traceIDIdxCache)
}

// Write implements backend.Writer
Expand Down Expand Up @@ -189,6 +181,8 @@ func (r *readerWriter) cacheFor(cacheInfo *backend.CacheInfo) cache.Cache {
return r.columnIdxCache
case cache.RoleParquetOffsetIdx:
return r.offsetIdxCache
case cache.RoleParquetPage:
return r.pageCache
case cache.RoleTraceIDIdx:
return r.traceIDIdxCache
case cache.RoleBloom:
Expand Down Expand Up @@ -217,3 +211,19 @@ func (r *readerWriter) cacheFor(cacheInfo *backend.CacheInfo) cache.Cache {

return nil
}

func store(ctx context.Context, cache cache.Cache, role cache.Role, key string, val []byte) {
write := val
if needsCopy(role) {
write = make([]byte, len(val))
copy(write, val)
}

cache.Store(ctx, []string{key}, [][]byte{write})
}

// needsCopy returns true if the role should be copied into a new buffer before being written to the cache
// todo: should this be signalled through cacheinfo instead?
func needsCopy(role cache.Role) bool {
return role == cache.RoleParquetPage // parquet pages are reused by the library. if we don't copy them then the buffer may be reused before written to cache
}
26 changes: 7 additions & 19 deletions tempodb/encoding/vparquet3/block_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"math"
"os"
"strconv"
Expand All @@ -13,7 +12,6 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/parquet-go/parquet-go"

tempo_io "github.com/grafana/tempo/pkg/io"
pq "github.com/grafana/tempo/pkg/parquetquery"
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
Expand Down Expand Up @@ -65,37 +63,27 @@ func (b *backendBlock) openForSearch(ctx context.Context, opts common.SearchOpti

// TODO: ctx is also cached when we cache backendReaderAt, not ideal but leaving it as is for now
backendReaderAt := NewBackendReaderAt(ctx, b.r, DataFileName, b.meta)

// no searches currently require bloom filters or the page index. so just add them statically
o := []parquet.FileOption{
parquet.SkipBloomFilters(true),
parquet.SkipPageIndex(true),
parquet.FileReadMode(parquet.ReadModeAsync),
}

// backend reader
readerAt := io.ReaderAt(backendReaderAt)

// buffering
if opts.ReadBufferSize > 0 {
// only use buffered reader at if the block is small, otherwise it's far more effective to use larger
// buffers in the parquet sdk
if opts.ReadBufferCount*opts.ReadBufferSize > int(b.meta.Size) {
readerAt = tempo_io.NewBufferedReaderAt(readerAt, int64(b.meta.Size), opts.ReadBufferSize, opts.ReadBufferCount)
} else {
o = append(o, parquet.ReadBufferSize(opts.ReadBufferSize))
}
// if the read buffer size provided is <= 0 then we'll use the parquet default
readBufferSize := opts.ReadBufferSize
if readBufferSize <= 0 {
readBufferSize = parquet.DefaultFileConfig().ReadBufferSize
}

// optimized reader
readerAt = newParquetOptimizedReaderAt(readerAt, int64(b.meta.Size), b.meta.FooterSize)
o = append(o, parquet.ReadBufferSize(readBufferSize))

// cached reader
readerAt = newCachedReaderAt(readerAt, backendReaderAt)
cachedReaderAt := newCachedReaderAt(backendReaderAt, readBufferSize, int64(b.meta.Size), b.meta.FooterSize) // most reads to the backend are going to be readbuffersize so use it as our "page cache" size

span, _ := opentracing.StartSpanFromContext(ctx, "parquet.OpenFile")
defer span.Finish()
pf, err := parquet.OpenFile(readerAt, int64(b.meta.Size), o...)
pf, err := parquet.OpenFile(cachedReaderAt, int64(b.meta.Size), o...)

return pf, backendReaderAt, err
}
Expand Down
Loading

0 comments on commit efb11ea

Please sign in to comment.