Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet page caching #3196

Merged
merged 16 commits into from
Dec 15, 2023
Merged
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ jobs:
- name: Lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.53.3
version: v1.55.2
only-new-issues: true

unit-tests-pkg:
name: Test packages - pkg
Expand Down
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
* [ENHANCEMENT] Improve TraceQL regex performance in certain queries. [#3139](https://github.com/grafana/tempo/pull/3139) (@joe-elliott)
* [ENHANCEMENT] Improve TraceQL performance in complex queries. [#3113](https://github.com/grafana/tempo/pull/3113) (@joe-elliott)
* [ENHANCEMENT] Added a `frontend-search` cache role for job search caching. [#3225](https://github.com/grafana/tempo/pull/3225) (@joe-elliott)
* [ENHANCEMENT] Added a `parquet-page` cache role for page level caching. [#3196](https://github.com/grafana/tempo/pull/3196) (@joe-elliott)
* [ENHANCEMENT] Update opentelemetry-collector-contrib dependency to the latest version, v0.89.0 [#3148](https://github.com/grafana/tempo/pull/3148) (@gebn)
* [BUGFIX] Prevent building parquet iterators that would loop forever. [#3159](https://github.com/grafana/tempo/pull/3159) (@mapno)
* [BUGFIX] Sanitize name in mapped dimensions in span-metrics processor [#3171](https://github.com/grafana/tempo/pull/3171) (@mapno)
* [ENHANCEMENT] Update opentelemetry-collector-contrib dependency to the latest version, v0.89.0 [#3148](https://github.com/grafana/tempo/pull/3148) (@gebn)
* [BUGFIX] Fixed an issue where cached footers were requested then ignored. [#3196](https://github.com/grafana/tempo/pull/3196) (@joe-elliott)

## v2.3.1 / 2023-11-28

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ lint:

.PHONY: docker-component # Not intended to be used directly
docker-component: check-component exe
docker build -t grafana/$(COMPONENT) --build-arg=TARGETARCH=$(GOARCH) -f ./cmd/$(COMPONENT)/Dockerfile .
docker build -t grafana/$(COMPONENT) --load --build-arg=TARGETARCH=$(GOARCH) -f ./cmd/$(COMPONENT)/Dockerfile .
docker tag grafana/$(COMPONENT) $(COMPONENT)

.PHONY: docker-component-debug
Expand Down
1 change: 1 addition & 0 deletions cmd/tempo/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
c.Compactor.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "compactor"), f)
c.StorageConfig.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "storage"), f)
c.UsageReport.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "reporting"), f)
c.CacheProvider.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "cache"), f)
}

// MultitenancyIsEnabled checks if multitenancy is enabled
Expand Down
7 changes: 4 additions & 3 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1499,9 +1499,10 @@ cache:
# every cache must have at least one role.
# Allowed values:
# bloom - Bloom filters for trace id lookup.
# parquet-footer - Parquet footer values. Useful for search and trace by id lookup.
# parquet-column-idx - Parquet column index values. Useful for search and trace by id lookup.
# parquet-offset-idx - Parquet offset index values. Useful for search and trace by id lookup.
# parquet-footer - Parquet footer values. Useful for search and trace by id lookup.
# parquet-column-idx - Parquet column index values. Useful for search and trace by id lookup.
# parquet-offset-idx - Parquet offset index values. Useful for search and trace by id lookup.
# parquet-page - Parquet "pages". WARNING: This will attempt to cache most reads from parquet and, as a result, is very high volume.
# frontend-search - Frontend search job results.

- roles:
Expand Down
9 changes: 9 additions & 0 deletions modules/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,16 @@ func (p *provider) starting(_ context.Context) error {
}

func (p *provider) stopping(_ error) error {
// we can only stop a cache once (or they panic). use this map
// to track which caches we've stopped.
stopped := map[cache.Cache]struct{}{}
Copy link
Collaborator

@stoewer stoewer Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I understand this correctly: is this necessary because p.caches contain the same cache multiple times?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. If a cache has multiple roles we add it to the map multiple times here:

for _, role := range cacheCfg.Role {
p.caches[role] = c
}

So this is just a bit of bookkeeping to prevent stopping one twice.


for _, c := range p.caches {
if _, ok := stopped[c]; ok {
continue
}

stopped[c] = struct{}{}
c.Stop()
}

Expand Down
12 changes: 12 additions & 0 deletions modules/cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"errors"
"flag"
"fmt"
"strings"

Expand Down Expand Up @@ -41,6 +42,10 @@ func (cfg *Config) Validate() error {

// check that all roles are unique
for _, role := range cacheCfg.Role {
if role == cache.RoleNone {
return fmt.Errorf("role none is not a valid role")
}

if _, ok := allRoles[role]; !ok {
return fmt.Errorf("role %s is not a valid role", role)
}
Expand All @@ -56,6 +61,12 @@ func (cfg *Config) Validate() error {
return nil
}

func (cfg *Config) RegisterFlagsAndApplyDefaults(_ string, _ *flag.FlagSet) {
cfg.Background = &cache.BackgroundConfig{}
cfg.Background.WriteBackBuffer = 10000
cfg.Background.WriteBackGoroutines = 10
}

// Name returns a string representation of the roles claimed by this cache.
func (cfg *CacheConfig) Name() string {
stringRoles := make([]string, len(cfg.Role))
Expand All @@ -73,6 +84,7 @@ func allRoles() map[cache.Role]struct{} {
cache.RoleParquetOffsetIdx,
cache.RoleTraceIDIdx,
cache.RoleFrontendSearch,
cache.RoleParquetPage,
}

roles := map[cache.Role]struct{}{}
Expand Down
12 changes: 12 additions & 0 deletions modules/cache/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ func TestConfigValidation(t *testing.T) {
},
expected: errors.New("configured caches require a valid role"),
},
{
name: "invalid - none",
cfg: &Config{
Caches: []CacheConfig{
{
Role: []cache.Role{cache.RoleNone},
MemcachedConfig: &memcached.Config{},
},
},
},
expected: errors.New("role none is not a valid role"),
},
{
name: "invalid - both caches configged",
cfg: &Config{
Expand Down
2 changes: 2 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ type Role string

const (
// individual roles
RoleNone Role = "none"
RoleBloom Role = "bloom"
RoleTraceIDIdx Role = "trace-id-index"
RoleParquetFooter Role = "parquet-footer"
RoleParquetColumnIdx Role = "parquet-column-idx"
RoleParquetOffsetIdx Role = "parquet-offset-idx"
RoleFrontendSearch Role = "frontend-search"
RoleParquetPage Role = "parquet-page"
)

// Provider is an object that can return a cache for a requested role
Expand Down
38 changes: 24 additions & 14 deletions tempodb/backend/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type readerWriter struct {
columnIdxCache cache.Cache
offsetIdxCache cache.Cache
traceIDIdxCache cache.Cache
pageCache cache.Cache
}

func NewCache(cfgBloom *BloomConfig, nextReader backend.RawReader, nextWriter backend.RawWriter, cacheProvider cache.Provider, logger log.Logger) (backend.RawReader, backend.RawWriter, error) {
Expand All @@ -44,6 +45,7 @@ func NewCache(cfgBloom *BloomConfig, nextReader backend.RawReader, nextWriter ba
offsetIdxCache: cacheProvider.CacheFor(cache.RoleParquetOffsetIdx),
columnIdxCache: cacheProvider.CacheFor(cache.RoleParquetColumnIdx),
traceIDIdxCache: cacheProvider.CacheFor(cache.RoleTraceIDIdx),
pageCache: cacheProvider.CacheFor(cache.RoleParquetPage),

nextReader: nextReader,
nextWriter: nextWriter,
Expand All @@ -55,6 +57,7 @@ func NewCache(cfgBloom *BloomConfig, nextReader backend.RawReader, nextWriter ba
"offset_idx", rw.offsetIdxCache != nil,
"column_idx", rw.columnIdxCache != nil,
"trace_id_idx", rw.traceIDIdxCache != nil,
"page", rw.pageCache != nil,
)

return rw, rw, nil
Expand Down Expand Up @@ -91,7 +94,7 @@ func (r *readerWriter) Read(ctx context.Context, name string, keypath backend.Ke

b, err := tempo_io.ReadAllWithEstimate(object, size)
if err == nil && cache != nil {
cache.Store(ctx, []string{k}, [][]byte{b})
store(ctx, cache, cacheInfo.Role, k, b)
}

return io.NopCloser(bytes.NewReader(b)), size, err
Expand All @@ -109,14 +112,15 @@ func (r *readerWriter) ReadRange(ctx context.Context, name string, keypath backe
found, vals, _ := cache.Fetch(ctx, []string{k})
if len(found) > 0 {
copy(buffer, vals[0])
return nil
}
}

// previous implemenation always passed false forward for "shouldCache" so we are matching that behavior by passing nil for cacheInfo
// todo: reevaluate. should we pass the cacheInfo forward?
err := r.nextReader.ReadRange(ctx, name, keypath, offset, buffer, nil)
if err == nil && cache != nil {
cache.Store(ctx, []string{k}, [][]byte{buffer})
store(ctx, cache, cacheInfo.Role, k, buffer)
}

return err
Expand All @@ -125,18 +129,6 @@ func (r *readerWriter) ReadRange(ctx context.Context, name string, keypath backe
// Shutdown implements backend.RawReader
func (r *readerWriter) Shutdown() {
r.nextReader.Shutdown()

stopCache := func(c cache.Cache) {
if c != nil {
c.Stop()
}
}

stopCache(r.footerCache)
stopCache(r.bloomCache)
stopCache(r.offsetIdxCache)
stopCache(r.columnIdxCache)
stopCache(r.traceIDIdxCache)
}

// Write implements backend.Writer
Expand Down Expand Up @@ -189,6 +181,8 @@ func (r *readerWriter) cacheFor(cacheInfo *backend.CacheInfo) cache.Cache {
return r.columnIdxCache
case cache.RoleParquetOffsetIdx:
return r.offsetIdxCache
case cache.RoleParquetPage:
return r.pageCache
case cache.RoleTraceIDIdx:
return r.traceIDIdxCache
case cache.RoleBloom:
Expand Down Expand Up @@ -217,3 +211,19 @@ func (r *readerWriter) cacheFor(cacheInfo *backend.CacheInfo) cache.Cache {

return nil
}

func store(ctx context.Context, cache cache.Cache, role cache.Role, key string, val []byte) {
write := val
if needsCopy(role) {
write = make([]byte, len(val))
copy(write, val)
}

cache.Store(ctx, []string{key}, [][]byte{write})
}

// needsCopy returns true if the role should be copied into a new buffer before being written to the cache
// todo: should this be signalled through cacheinfo instead?
func needsCopy(role cache.Role) bool {
return role == cache.RoleParquetPage // parquet pages are reused by the library. if we don't copy them then the buffer may be reused before written to cache
}
26 changes: 7 additions & 19 deletions tempodb/encoding/vparquet3/block_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"math"
"os"
"strconv"
Expand All @@ -13,7 +12,6 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/parquet-go/parquet-go"

tempo_io "github.com/grafana/tempo/pkg/io"
pq "github.com/grafana/tempo/pkg/parquetquery"
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
Expand Down Expand Up @@ -65,37 +63,27 @@ func (b *backendBlock) openForSearch(ctx context.Context, opts common.SearchOpti

// TODO: ctx is also cached when we cache backendReaderAt, not ideal but leaving it as is for now
backendReaderAt := NewBackendReaderAt(ctx, b.r, DataFileName, b.meta)

// no searches currently require bloom filters or the page index. so just add them statically
o := []parquet.FileOption{
parquet.SkipBloomFilters(true),
parquet.SkipPageIndex(true),
parquet.FileReadMode(parquet.ReadModeAsync),
}

// backend reader
readerAt := io.ReaderAt(backendReaderAt)

// buffering
if opts.ReadBufferSize > 0 {
// only use buffered reader at if the block is small, otherwise it's far more effective to use larger
// buffers in the parquet sdk
if opts.ReadBufferCount*opts.ReadBufferSize > int(b.meta.Size) {
readerAt = tempo_io.NewBufferedReaderAt(readerAt, int64(b.meta.Size), opts.ReadBufferSize, opts.ReadBufferCount)
} else {
o = append(o, parquet.ReadBufferSize(opts.ReadBufferSize))
}
// if the read buffer size provided is <= 0 then we'll use the parquet default
readBufferSize := opts.ReadBufferSize
if readBufferSize <= 0 {
readBufferSize = parquet.DefaultFileConfig().ReadBufferSize
}

// optimized reader
readerAt = newParquetOptimizedReaderAt(readerAt, int64(b.meta.Size), b.meta.FooterSize)
o = append(o, parquet.ReadBufferSize(readBufferSize))

// cached reader
readerAt = newCachedReaderAt(readerAt, backendReaderAt)
cachedReaderAt := newCachedReaderAt(backendReaderAt, readBufferSize, int64(b.meta.Size), b.meta.FooterSize) // most reads to the backend are going to be readbuffersize so use it as our "page cache" size

span, _ := opentracing.StartSpanFromContext(ctx, "parquet.OpenFile")
defer span.Finish()
pf, err := parquet.OpenFile(readerAt, int64(b.meta.Size), o...)
pf, err := parquet.OpenFile(cachedReaderAt, int64(b.meta.Size), o...)

return pf, backendReaderAt, err
}
Expand Down
Loading
Loading