Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metadata caching for parquet blocks #1564

Merged
merged 11 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/jaegertracing/jaeger v1.31.0
github.com/jedib0t/go-pretty/v6 v6.2.4
github.com/jsternberg/zap-logfmt v1.2.0
github.com/klauspost/compress v1.15.6
github.com/klauspost/compress v1.15.7
github.com/minio/minio-go/v7 v7.0.16-0.20211116163909-d00629356463
github.com/mitchellh/mapstructure v1.4.3
github.com/olekukonko/tablewriter v0.0.5
Expand All @@ -53,15 +53,15 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.46.0
github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e
github.com/opentracing/opentracing-go v1.2.0
github.com/pierrec/lz4/v4 v4.1.14
github.com/pierrec/lz4/v4 v4.1.15
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.32.1
github.com/prometheus/prometheus v1.8.2-0.20220228151929-e25a59925555
github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/segmentio/parquet-go v0.0.0-20220615011901-16edc2ee0508
github.com/segmentio/parquet-go v0.0.0-20220711093320-3dfa7c5b3344
github.com/sirupsen/logrus v1.8.1
github.com/sony/gobreaker v0.4.1
github.com/spf13/viper v1.10.1
Expand Down Expand Up @@ -238,7 +238,7 @@ require (
golang.org/x/mod v0.5.1 // indirect
golang.org/x/net v0.0.0-20220615171555-694bf12d69de // indirect
golang.org/x/oauth2 v0.0.0-20220608161450-d0670ef3b1eb // indirect
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c // indirect
golang.org/x/sys v0.0.0-20220708085239-5a0f0661e09d // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.9 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,8 @@ github.com/klauspost/compress v1.14.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47e
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.6 h1:6D9PcO8QWu0JyaQ2zUMmu16T1T+zjjEpP91guRsvDfY=
github.com/klauspost/compress v1.15.6/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.7 h1:7cgTQxJCU/vy+oP/E3B9RGbQTgbiVzIJWIKOLoAsPok=
github.com/klauspost/compress v1.15.7/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s=
Expand Down Expand Up @@ -1728,6 +1730,8 @@ github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
github.com/pierrec/lz4/v3 v3.3.4/go.mod h1:280XNCGS8jAcG++AHdd6SeWnzyJ1w9oow2vbORyey8Q=
github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE=
github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -1917,6 +1921,8 @@ github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfP
github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/segmentio/parquet-go v0.0.0-20220615011901-16edc2ee0508 h1:4IzWHlJZTnWwXqDGZGbc06rzhjgMQV3ySy+ED6y19tk=
github.com/segmentio/parquet-go v0.0.0-20220615011901-16edc2ee0508/go.mod h1:BuMbRhCCg3gFchup9zucJaUjQ4m6RxX+iVci37CoMPQ=
github.com/segmentio/parquet-go v0.0.0-20220711093320-3dfa7c5b3344 h1:oT/mkTqjdhnF7X0oHgfiNyHcTjKZadNIAFwkbizM5gQ=
github.com/segmentio/parquet-go v0.0.0-20220711093320-3dfa7c5b3344/go.mod h1:BuMbRhCCg3gFchup9zucJaUjQ4m6RxX+iVci37CoMPQ=
github.com/sercand/kuberesolver v2.1.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8=
github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
Expand Down Expand Up @@ -2658,6 +2664,8 @@ golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c h1:aFV+BgZ4svzjfabn8ERpuB4JI4N6/rdy1iusx77G3oU=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220708085239-5a0f0661e09d h1:/m5NbqQelATgoSPVC2Z23sR4kVNokFwDDyWh/3rGY+I=
golang.org/x/sys v0.0.0-20220708085239-5a0f0661e09d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
Expand Down
4 changes: 3 additions & 1 deletion tempodb/encoding/vparquet/block_findtracebyid.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID) (_

br := tempo_io.NewBufferedReaderAt(rr, int64(b.meta.Size), 512*1024, 32)

pf, err := parquet.OpenFile(br, int64(b.meta.Size))
or := &parquetOptimizedReaderAt{br, rr, int64(b.meta.Size), b.meta.FooterSize, map[int64]int64{}}
annanay25 marked this conversation as resolved.
Show resolved Hide resolved

pf, err := parquet.OpenFile(or, int64(b.meta.Size))
if err != nil {
return nil, errors.Wrap(err, "error opening file in FindTraceByID")
}
Expand Down
57 changes: 4 additions & 53 deletions tempodb/encoding/vparquet/block_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,20 @@ package vparquet

import (
"context"
"encoding/binary"
"fmt"
"io"
"math"
"strconv"
"time"

"github.com/google/uuid"
"github.com/opentracing/opentracing-go"
"github.com/segmentio/parquet-go"

tempo_io "github.com/grafana/tempo/pkg/io"
pq "github.com/grafana/tempo/pkg/parquetquery"
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/opentracing/opentracing-go"
"github.com/segmentio/parquet-go"
)

// These are reserved search parameters
Expand All @@ -37,52 +34,6 @@ var StatusCodeMapping = map[string]int{
StatusCodeError: int(v1.Status_STATUS_CODE_ERROR),
}

type BackendReaderAt struct {
ctx context.Context
r backend.Reader
name string
blockID uuid.UUID
tenantID string

TotalBytesRead uint64
}

var _ io.ReaderAt = (*BackendReaderAt)(nil)

func NewBackendReaderAt(ctx context.Context, r backend.Reader, name string, blockID uuid.UUID, tenantID string) *BackendReaderAt {
return &BackendReaderAt{ctx, r, name, blockID, tenantID, 0}
}

func (b *BackendReaderAt) ReadAt(p []byte, off int64) (int, error) {
b.TotalBytesRead += uint64(len(p))
err := b.r.ReadRange(b.ctx, b.name, b.blockID, b.tenantID, uint64(off), p, false)
return len(p), err
}

type parquetOptimizedReaderAt struct {
r io.ReaderAt
readerSize int64
footerSize uint32
}

var _ io.ReaderAt = (*parquetOptimizedReaderAt)(nil)

func (r *parquetOptimizedReaderAt) ReadAt(p []byte, off int64) (int, error) {
if len(p) == 4 && off == 0 {
// Magic header
return copy(p, []byte("PAR1")), nil
}

if len(p) == 8 && off == r.readerSize-8 && r.footerSize > 0 /* not present in previous block metas */ {
// Magic footer
binary.LittleEndian.PutUint32(p, r.footerSize)
copy(p[4:8], []byte("PAR1"))
return 8, nil
}

return r.r.ReadAt(p, off)
}

func (b *backendBlock) Search(ctx context.Context, req *tempopb.SearchRequest, opts common.SearchOptions) (_ *tempopb.SearchResponse, err error) {
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "parquet.backendBlock.Search",
opentracing.Tags{
Expand All @@ -97,7 +48,7 @@ func (b *backendBlock) Search(ctx context.Context, req *tempopb.SearchRequest, o

br := tempo_io.NewBufferedReaderAt(rr, int64(b.meta.Size), opts.ReadBufferSize, opts.ReadBufferCount)

or := &parquetOptimizedReaderAt{br, int64(b.meta.Size), b.meta.FooterSize}
or := &parquetOptimizedReaderAt{br, rr, int64(b.meta.Size), b.meta.FooterSize, map[int64]int64{}}

span2, _ := opentracing.StartSpanFromContext(derivedCtx, "parquet.OpenFile")
pf, err := parquet.OpenFile(or, int64(b.meta.Size), parquet.SkipPageIndex(true))
Expand Down
86 changes: 86 additions & 0 deletions tempodb/encoding/vparquet/parquet_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package vparquet
mdisibio marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"encoding/binary"
"io"

"github.com/google/uuid"

"github.com/grafana/tempo/tempodb/backend"
)

type BackendReaderAt struct {
ctx context.Context
r backend.Reader
name string
blockID uuid.UUID
tenantID string

TotalBytesRead uint64
}

var _ io.ReaderAt = (*BackendReaderAt)(nil)

func NewBackendReaderAt(ctx context.Context, r backend.Reader, name string, blockID uuid.UUID, tenantID string) *BackendReaderAt {
return &BackendReaderAt{ctx, r, name, blockID, tenantID, 0}
}

func (b *BackendReaderAt) ReadAt(p []byte, off int64) (int, error) {
b.TotalBytesRead += uint64(len(p))
err := b.r.ReadRange(b.ctx, b.name, b.blockID, b.tenantID, uint64(off), p, false)
return len(p), err
}

func (b *BackendReaderAt) ReadAtWithCache(p []byte, off int64) (int, error) {
err := b.r.ReadRange(b.ctx, b.name, b.blockID, b.tenantID, uint64(off), p, true)
return len(p), err
}

type parquetOptimizedReaderAt struct {
r io.ReaderAt
br *BackendReaderAt
readerSize int64
footerSize uint32

// storing offsets and length of objects we want to cache
cachedObjects map[int64]int64
}

var _ io.ReaderAt = (*parquetOptimizedReaderAt)(nil)

// this will be called by parquet-go in OpenFile() to set offset and length of footer section
func (r *parquetOptimizedReaderAt) SetFooterSection(offset, length int64) {
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
r.cachedObjects[offset] = length
}

// this will be called by parquet-go in OpenFile() to set offset and length of column indexes
func (r *parquetOptimizedReaderAt) SetColumnIndexSection(offset, length int64) {
r.cachedObjects[offset] = length
}

// this will be called by parquet-go in OpenFile() to set offset and length of offset index section
func (r *parquetOptimizedReaderAt) SetOffsetIndexSection(offset, length int64) {
r.cachedObjects[offset] = length
}

func (r *parquetOptimizedReaderAt) ReadAt(p []byte, off int64) (int, error) {
if len(p) == 4 && off == 0 {
// Magic header
return copy(p, []byte("PAR1")), nil
}

if len(p) == 8 && off == r.readerSize-8 && r.footerSize > 0 /* not present in previous block metas */ {
// Magic footer
binary.LittleEndian.PutUint32(p, r.footerSize)
copy(p[4:8], []byte("PAR1"))
return 8, nil
}

// check if the offset and length is stored as a special object
if r.cachedObjects[off] == int64(len(p)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're kind of "forking" the read path here. We're sending special queries off to the backend reader and other queries to the in memory buffered reader at. Is there a cleaner implementation here?

It feels like all calls should go through the same layers:
Parquet File -> In memory cache -> Off process cache (for specific segments of the file) -> Object storage

Is there a clean way to achieve this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All calls are going through the same layers, ie. Tempo Querier -> Cached Reader -> Backend reader -> Object storage. The same underlying BackendReaderAt is called each time. If you look at this piece of logic here - https://github.com/grafana/tempo/pull/1564/files#diff-6c42681fadfdec225eb9164d0d49a2fe2033bb8ff0e820ecbd5010bb0bb629d8R37-R51
The difference is that just one of the readers is buffered.

We are just setting shouldCache to true in some special cases and leaving it as false for most other calls.

Not sure if I'm missing something, we can also chat offline to discuss.

Copy link
Member

@joe-elliott joe-elliott Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can't say I have great ideas here. I think what I expect in some perfect world is

  • only one "reader" being passed to this object which all calls go through.
  • the special ranges (footer, metadata, etc) also passing through the in memory buffer

However, I think any attempt to do this while also caching the special ranges would be difficult and would make other sections of code worse. My largest concern is that by adding this code we will be increasing calls to the backend especially in cases where off process cache is not configured. Here is a read listing on searching the cluster column:

len=77631 off=664715997 hit=false dur=127.416656ms
len=4096 off=4645575 hit=false dur=324.308924ms
len=4096 off=4649671 hit=true dur=324.310427ms
len=4096 off=4653767 hit=true dur=324.312226ms
...
len=4096 off=47668776 hit=false dur=695.911273ms
len=4096 off=47672872 hit=true dur=695.912669ms
len=4096 off=47676968 hit=true dur=695.913873ms
...
len=4096 off=85484375 hit=false dur=1.965658619s
len=4096 off=85488471 hit=true dur=1.96566025s
len=4096 off=85492567 hit=true dur=1.965661536s
...

The first read is the footer. Then it attempts to start reading the column in one of the row groups. You can see there is a buffered cache miss (hit = false) everytime it jumps to a new row group. If that first call is grabbing some kind of section metadata would it increase the number of calls to the backend b/c it wouldn't be hitting the buffered reader?

Maybe all I'm wanting here is a way to configure this on/off to assess impact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, and went ahead with making cache settings configurable. Let me know if this works!

r.br.ReadAtWithCache(p, off)
}

return r.r.ReadAt(p, off)
}
12 changes: 12 additions & 0 deletions vendor/github.com/klauspost/compress/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading