Skip to content

Commit

Permalink
combined cached reader and optimized reader
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott committed Dec 5, 2023
1 parent 7a36e02 commit e128456
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 69 deletions.
10 changes: 2 additions & 8 deletions tempodb/encoding/vparquet3/block_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ func (b *backendBlock) openForSearch(ctx context.Context, opts common.SearchOpti

// TODO: ctx is also cached when we cache backendReaderAt, not ideal but leaving it as is for now
backendReaderAt := NewBackendReaderAt(ctx, b.r, DataFileName, b.meta)

readerAt := cacheReaderAt(backendReaderAt)

// no searches currently require bloom filters or the page index. so just add them statically
o := []parquet.FileOption{
parquet.SkipBloomFilters(true),
Expand All @@ -81,15 +78,12 @@ func (b *backendBlock) openForSearch(ctx context.Context, opts common.SearchOpti

o = append(o, parquet.ReadBufferSize(readBufferSize))

// optimized reader
readerAt = newParquetOptimizedReaderAt(readerAt, int64(b.meta.Size), b.meta.FooterSize)

// cached reader
readerAt = newCachedReaderAt(readerAt, readBufferSize) // most reads to the backend are going to be readbuffersize so use it as our "page cache" size
cachedReaderAt := newCachedReaderAt(backendReaderAt, readBufferSize, int64(b.meta.Size), b.meta.FooterSize) // most reads to the backend are going to be readbuffersize so use it as our "page cache" size

span, _ := opentracing.StartSpanFromContext(ctx, "parquet.OpenFile")
defer span.Finish()
pf, err := parquet.OpenFile(readerAt.(*cachedReaderAt), int64(b.meta.Size), o...)
pf, err := parquet.OpenFile(cachedReaderAt, int64(b.meta.Size), o...)

return pf, backendReaderAt, err
}
Expand Down
57 changes: 21 additions & 36 deletions tempodb/encoding/vparquet3/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,44 +45,14 @@ func (b *BackendReaderAt) ReadAtWithCache(p []byte, off int64, role cache.Role)
if err != nil {
return 0, err
}
b.bytesRead.Add(uint64(len(p)))
return len(p), nil
}

func (b *BackendReaderAt) BytesRead() uint64 {
return b.bytesRead.Load()
}

// parquetOptimizedReaderAt is used to cheat a few parquet calls. By default when opening a
// file parquet always requests the magic number and then the footer length. We can save
// both of these calls from going to the backend.
type parquetOptimizedReaderAt struct {
r cacheReaderAt
readerSize int64
footerSize uint32
}

var _ cacheReaderAt = (*parquetOptimizedReaderAt)(nil)

func newParquetOptimizedReaderAt(r cacheReaderAt, size int64, footerSize uint32) *parquetOptimizedReaderAt {
return &parquetOptimizedReaderAt{r, size, footerSize}
}

func (r *parquetOptimizedReaderAt) ReadAtWithCache(p []byte, off int64, role cache.Role) (int, error) {
if len(p) == 4 && off == 0 {
// Magic header
return copy(p, []byte("PAR1")), nil
}

if len(p) == 8 && off == r.readerSize-8 && r.footerSize > 0 /* not present in previous block metas */ {
// Magic footer
binary.LittleEndian.PutUint32(p, r.footerSize)
copy(p[4:8], []byte("PAR1"))
return 8, nil
}

return r.r.ReadAtWithCache(p, off, role)
}

type cachedObjectRecord struct {
length int64
role cache.Role
Expand All @@ -94,13 +64,16 @@ type cachedReaderAt struct {
r cacheReaderAt
cachedObjects map[int64]cachedObjectRecord // storing offsets and length of objects we want to cache

readerSize int64
footerSize uint32

maxPageSize int
}

var _ cacheReaderAt = (*cachedReaderAt)(nil)

func newCachedReaderAt(r cacheReaderAt, maxPageSize int) *cachedReaderAt {
return &cachedReaderAt{r, map[int64]cachedObjectRecord{}, maxPageSize}
func newCachedReaderAt(r cacheReaderAt, maxPageSize int, size int64, footerSize uint32) *cachedReaderAt {
return &cachedReaderAt{r, map[int64]cachedObjectRecord{}, size, footerSize, maxPageSize}
}

// called by parquet-go in OpenFile() to set offset and length of footer section
Expand All @@ -119,13 +92,25 @@ func (r *cachedReaderAt) SetOffsetIndexSection(offset, length int64) {
}

func (r *cachedReaderAt) ReadAt(p []byte, off int64) (int, error) {
if len(p) == 4 && off == 0 {
// Magic header
return copy(p, []byte("PAR1")), nil
}

if len(p) == 8 && off == r.readerSize-8 && r.footerSize > 0 /* not present in previous block metas */ {
// Magic footer
binary.LittleEndian.PutUint32(p, r.footerSize)
copy(p[4:8], []byte("PAR1"))
return 8, nil
}

// check if the offset and length is stored as a special object
rec := r.cachedObjects[off]
if rec.length == int64(len(p)) {
rec, ok := r.cachedObjects[off]
if ok && rec.length == int64(len(p)) {
return r.r.ReadAtWithCache(p, off, rec.role)
}

if rec.length <= int64(r.maxPageSize) {
if len(p) <= r.maxPageSize {
return r.r.ReadAtWithCache(p, off, cache.RoleParquetPage)
}

Expand Down
55 changes: 30 additions & 25 deletions tempodb/encoding/vparquet3/readers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,87 +58,92 @@ func TestParquetGoSetsMetadataSections(t *testing.T) {
require.True(t, dr.offsetIndex)
}

func TestParquetReaderAt(t *testing.T) {
func TestCachingReaderShortcircuitsFooterHeader(t *testing.T) {
rr := &recordingReaderAt{}
pr := newParquetOptimizedReaderAt(rr, 1000, 100)
pr := newCachedReaderAt(rr, 1000, 1000, 100)

expectedReads := []read{}

// magic number doesn't pass through
_, err := pr.ReadAtWithCache(make([]byte, 4), 0, cache.RoleNone)
_, err := pr.ReadAt(make([]byte, 4), 0)
require.NoError(t, err)

// footer size doesn't pass through
_, err = pr.ReadAtWithCache(make([]byte, 8), 992, cache.RoleNone)
_, err = pr.ReadAt(make([]byte, 8), 992)
require.NoError(t, err)

// other calls pass through
_, err = pr.ReadAtWithCache(make([]byte, 13), 25, cache.RoleNone)
_, err = pr.ReadAt(make([]byte, 13), 25)
require.NoError(t, err)
expectedReads = append(expectedReads, read{13, 25})
expectedReads = append(expectedReads, read{13, 25, cache.RoleParquetPage})

_, err = pr.ReadAtWithCache(make([]byte, 97), 118, cache.RoleNone)
_, err = pr.ReadAt(make([]byte, 97), 118)
require.NoError(t, err)
expectedReads = append(expectedReads, read{97, 118})
expectedReads = append(expectedReads, read{97, 118, cache.RoleParquetPage})

_, err = pr.ReadAtWithCache(make([]byte, 59), 421, cache.RoleNone)
_, err = pr.ReadAt(make([]byte, 59), 421)
require.NoError(t, err)
expectedReads = append(expectedReads, read{59, 421})
expectedReads = append(expectedReads, read{59, 421, cache.RoleParquetPage})

require.Equal(t, expectedReads, rr.reads)
}

func TestCachingReaderAt(t *testing.T) { // jpe test for page
func TestCachingReaderAt(t *testing.T) {
rr := &recordingReaderAt{}
cr := newCachedReaderAt(rr, 1000)
cr := newCachedReaderAt(rr, 1000, 100000, 10)

// cached items should not hit rr
expectedReads := []read{}

// specially cached sections
cr.SetColumnIndexSection(1, 34)
_, err := cr.ReadAt(make([]byte, 34), 1)
expectedReads = append(expectedReads, read{34, 1, cache.RoleParquetColumnIdx})
require.NoError(t, err)

cr.SetFooterSection(14, 20)
_, err = cr.ReadAt(make([]byte, 20), 14)
expectedReads = append(expectedReads, read{20, 14, cache.RoleParquetFooter})
require.NoError(t, err)

cr.SetOffsetIndexSection(13, 12)
_, err = cr.ReadAt(make([]byte, 12), 13)
expectedReads = append(expectedReads, read{12, 13, cache.RoleParquetOffsetIdx})
require.NoError(t, err)

// other calls hit rr
expectedReads := []read{}

// everything else is a parquet page
_, err = cr.ReadAt(make([]byte, 13), 25)
require.NoError(t, err)
expectedReads = append(expectedReads, read{13, 25})
expectedReads = append(expectedReads, read{13, 25, cache.RoleParquetPage})

_, err = cr.ReadAt(make([]byte, 97), 118)
require.NoError(t, err)
expectedReads = append(expectedReads, read{97, 118})
expectedReads = append(expectedReads, read{97, 118, cache.RoleParquetPage})

_, err = cr.ReadAt(make([]byte, 59), 421)
// unless it's larger than the page size
_, err = cr.ReadAt(make([]byte, 1001), 421)
require.NoError(t, err)
expectedReads = append(expectedReads, read{59, 421})
expectedReads = append(expectedReads, read{1001, 421, cache.RoleNone})

require.Equal(t, expectedReads, rr.reads)
}

type read struct {
len int
off int64
len int
off int64
role cache.Role
}
type recordingReaderAt struct {
reads []read
}

func (r *recordingReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
r.reads = append(r.reads, read{len(p), off})
r.reads = append(r.reads, read{len(p), off, ""})

return len(p), nil
}

func (r *recordingReaderAt) ReadAtWithCache(p []byte, off int64, _ cache.Role) (n int, err error) {
r.reads = append(r.reads, read{len(p), off})
func (r *recordingReaderAt) ReadAtWithCache(p []byte, off int64, role cache.Role) (n int, err error) {
r.reads = append(r.reads, read{len(p), off, role})

return len(p), nil
}

0 comments on commit e128456

Please sign in to comment.