Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache: Read to a sized buffer #3976

Merged
merged 6 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* [ENHANCEMENT] Allow compaction disablement per-tenant [#3965](https://github.com/grafana/tempo/pull/3965) (@zalegrala)
* [ENHANCEMENT] Implement polling tenants concurrently [#3647](https://github.com/grafana/tempo/pull/3647) (@zalegrala)
* [ENHANCEMENT] Added new middleware to block matching urls [#3963](https://github.com/grafana/tempo/pull/3963) (@javiermolinar)
* [ENHANCEMENT] Reduce allocs of caching middleware [#3976](https://github.com/grafana/tempo/pull/3976) (@joe-elliott)
* [BUGFIX] Fix panic in certain metrics queries using `rate()` with `by` [#3847](https://github.com/grafana/tempo/pull/3847) (@stoewer)
* [BUGFIX] Fix double appending the primary iterator on second pass with event iterator [#3903](https://github.com/grafana/tempo/pull/3903) (@ie-pham)
* [BUGFIX] Fix metrics queries when grouping by attributes that may not exist [#3734](https://github.com/grafana/tempo/pull/3734) (@mdisibio)
Expand Down
40 changes: 40 additions & 0 deletions modules/cache/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"testing"

"github.com/go-kit/log"
"github.com/grafana/tempo/modules/cache/memcached"
"github.com/grafana/tempo/modules/cache/redis"
"github.com/grafana/tempo/pkg/cache"
Expand Down Expand Up @@ -148,3 +149,42 @@ func TestCacheConfigName(t *testing.T) {
require.Equal(t, tc.expected, actual)
}
}

func TestMaxItemSize(t *testing.T) {
tcs := []struct {
cfg *CacheConfig
expected int
}{
{
cfg: &CacheConfig{
Role: []cache.Role{cache.RoleBloom, cache.RoleParquetColumnIdx},
MemcachedConfig: &memcached.Config{
ClientConfig: cache.MemcachedClientConfig{
MaxItemSize: 123,
},
},
},
expected: 123,
},
{
cfg: &CacheConfig{
Role: []cache.Role{cache.RoleBloom, cache.RoleFrontendSearch},
RedisConfig: &redis.Config{},
},
expected: 0, // redis does not support max item size
},
}

for _, tc := range tcs {
t.Run("", func(t *testing.T) {
p, err := NewProvider(&Config{
Caches: []CacheConfig{*tc.cfg},
Background: &cache.BackgroundConfig{},
}, log.NewNopLogger())
require.NoError(t, err)

cache := p.CacheFor(cache.RoleBloom)
require.Equal(t, tc.expected, cache.MaxItemSize())
})
}
}
2 changes: 1 addition & 1 deletion modules/cache/memcached/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewClient(cfg *Config, cfgBackground *cache.BackgroundConfig, name string,
BatchSize: 0, // we are currently only requesting one key at a time, which is bad. we could restructure Find() to batch request all blooms at once
Parallelism: 0,
}
c := cache.NewMemcached(memcachedCfg, client, name, prometheus.DefaultRegisterer, logger)
c := cache.NewMemcached(memcachedCfg, client, name, cfg.ClientConfig.MaxItemSize, prometheus.DefaultRegisterer, logger)

return cache.NewBackground(name, *cfgBackground, c, prometheus.DefaultRegisterer)
}
25 changes: 16 additions & 9 deletions modules/frontend/pipeline/sync_handler_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pipeline
import (
"bytes"
"context"
"fmt"
"io"
"net/http"

Expand Down Expand Up @@ -75,21 +76,27 @@ func (c cachingWare) RoundTrip(req Request) (*http.Response, error) {
}

if len(key) > 0 {
// don't bother caching if the response is too large
maxItemSize := c.cache.c.MaxItemSize()
if maxItemSize > 0 && resp.ContentLength > int64(maxItemSize) {
return resp, nil
}

buffer, err := api.ReadBodyToBuffer(resp)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if maxItemSize is set to 0, then we will always cache, no matter the size.

is my understanding correct?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we handle the case where maxItemSize is a negative value??

if err != nil {
return resp, fmt.Errorf("failed to cache: %w", err)
}

// reset the body so the caller can read it
resp.Body = io.NopCloser(buffer)

// cache the response
// todo: currently this is blindly caching any 200 status codes. it would be a bug, but it's possible for a querier
// to return a 200 status code with a response that does not parse as the expected type in the combiner.
// technically this should never happen...
// long term we should migrate the sync part of the pipeline to use generics so we can do the parsing early in the pipeline
// and be confident it's cacheable.
b, err := io.ReadAll(resp.Body)

// reset the body so the caller can read it
resp.Body = io.NopCloser(bytes.NewBuffer(b))
if err != nil {
return resp, nil
}

c.cache.store(req.Context(), key, b)
c.cache.store(req.Context(), key, buffer.Bytes())
}

return resp, nil
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/search_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func TestSearchAccessesCache(t *testing.T) {
}

// setup mock cache
c := cache.NewMockCache()
c := test.NewMockClient()
p := test.NewMockProvider()
err := p.AddCache(cache.RoleFrontendSearch, c)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/tag_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func TestSearchTagsV2AccessesCache(t *testing.T) {
}

// setup mock cache
c := cache.NewMockCache()
c := test.NewMockClient()
p := test.NewMockProvider()
err := p.AddCache(cache.RoleFrontendSearch, c)
require.NoError(t, err)
Expand Down
21 changes: 21 additions & 0 deletions pkg/api/http.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"bytes"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -806,3 +807,23 @@ func ValidateAndSanitizeRequest(r *http.Request) (string, string, string, int64,
}
return blockStart, blockEnd, queryMode, startTime, endTime, nil
}

func ReadBodyToBuffer(resp *http.Response) (*bytes.Buffer, error) {
length := resp.ContentLength
// if ContentLength is -1 if the length is unknown. default to bytes.MinRead (its what buffer.ReadFrom does)
if length < 0 {
length = bytes.MinRead
}
// buffer.ReadFrom always allocs at least bytes.MinRead past the end of the actual required length b/c of how io.EOF is handled. this prevents extending the internal
// slice unnecessarily. https://github.com/golang/go/issues/21852
length += bytes.MinRead

// alloc a buffer to store the response body
buffer := bytes.NewBuffer(make([]byte, 0, length))
_, err := buffer.ReadFrom(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}

return buffer, nil
}
3 changes: 2 additions & 1 deletion pkg/cache/background_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"testing"

"github.com/grafana/tempo/pkg/cache"
"github.com/grafana/tempo/pkg/util/test"
"github.com/stretchr/testify/require"
)

func TestBackground(t *testing.T) {
c := cache.NewBackground("mock", cache.BackgroundConfig{
WriteBackGoroutines: 1,
WriteBackBuffer: 100,
}, cache.NewMockCache(), nil)
}, test.NewMockClient(), nil)

keys, chunks := fillCache(c)
cache.Flush(c)
Expand Down
1 change: 1 addition & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Provider interface {
// when they returned an error.
type Cache interface {
Store(ctx context.Context, key []string, buf [][]byte)
MaxItemSize() int
// TODO: both cached backend clients support deletion. Should we implement?
// Remove(ctx context.Context, key []string)
Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string)
Expand Down
22 changes: 14 additions & 8 deletions pkg/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ func (cfg *MemcachedConfig) RegisterFlagsWithPrefix(prefix, description string,

// Memcached type caches chunks in memcached
type Memcached struct {
cfg MemcachedConfig
memcache MemcachedClient
name string
cfg MemcachedConfig
memcache MemcachedClient
name string
maxItemSize int

requestDuration *instr.HistogramCollector

Expand All @@ -48,12 +49,13 @@ type Memcached struct {
}

// NewMemcached makes a new Memcached.
func NewMemcached(cfg MemcachedConfig, client MemcachedClient, name string, reg prometheus.Registerer, logger log.Logger) *Memcached {
func NewMemcached(cfg MemcachedConfig, client MemcachedClient, name string, maxItemSize int, reg prometheus.Registerer, logger log.Logger) *Memcached {
c := &Memcached{
cfg: cfg,
memcache: client,
name: name,
logger: logger,
cfg: cfg,
memcache: client,
name: name,
maxItemSize: maxItemSize,
logger: logger,
requestDuration: instr.NewHistogramCollector(
promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tempo",
Expand Down Expand Up @@ -261,3 +263,7 @@ func (c *Memcached) Stop() {
}
c.wg.Wait()
}

func (c *Memcached) MaxItemSize() int {
return c.maxItemSize
}
12 changes: 6 additions & 6 deletions pkg/cache/memcached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestMemcached(t *testing.T) {
t.Run("unbatched", func(t *testing.T) {
client := newMockMemcache()
memcache := cache.NewMemcached(cache.MemcachedConfig{}, client,
"test", nil, log.NewNopLogger())
"test", 0, nil, log.NewNopLogger())

testMemcache(t, memcache)
})
Expand All @@ -28,7 +28,7 @@ func TestMemcached(t *testing.T) {
memcache := cache.NewMemcached(cache.MemcachedConfig{
BatchSize: 10,
Parallelism: 5,
}, client, "test", nil, log.NewNopLogger())
}, client, "test", 0, nil, log.NewNopLogger())

testMemcache(t, memcache)
})
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestMemcacheFailure(t *testing.T) {
t.Run("unbatched", func(t *testing.T) {
client := newMockMemcacheFailing()
memcache := cache.NewMemcached(cache.MemcachedConfig{}, client,
"test", nil, log.NewNopLogger())
"test", 0, nil, log.NewNopLogger())

testMemcacheFailing(t, memcache)
})
Expand All @@ -104,7 +104,7 @@ func TestMemcacheFailure(t *testing.T) {
memcache := cache.NewMemcached(cache.MemcachedConfig{
BatchSize: 10,
Parallelism: 5,
}, client, "test", nil, log.NewNopLogger())
}, client, "test", 0, nil, log.NewNopLogger())

testMemcacheFailing(t, memcache)
})
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestMemcacheStop(t *testing.T) {
t.Run("unbatched", func(_ *testing.T) {
client := newMockMemcacheFailing()
memcache := cache.NewMemcached(cache.MemcachedConfig{}, client,
"test", nil, log.NewNopLogger())
"test", 0, nil, log.NewNopLogger())

testMemcachedStopping(memcache)
})
Expand All @@ -171,7 +171,7 @@ func TestMemcacheStop(t *testing.T) {
memcache := cache.NewMemcached(cache.MemcachedConfig{
BatchSize: 10,
Parallelism: 5,
}, client, "test", nil, log.NewNopLogger())
}, client, "test", 0, nil, log.NewNopLogger())

testMemcachedStopping(memcache)
})
Expand Down
44 changes: 0 additions & 44 deletions pkg/cache/mock.go

This file was deleted.

5 changes: 5 additions & 0 deletions pkg/cache/redis_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,8 @@ func (c *RedisCache) Store(ctx context.Context, keys []string, bufs [][]byte) {
func (c *RedisCache) Stop() {
_ = c.redis.Close()
}

// redis doesn't have a max item size. todo: add
func (c *RedisCache) MaxItemSize() int {
return 0
}
36 changes: 25 additions & 11 deletions pkg/util/test/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,51 @@ package test

import (
"context"
"sync"

"github.com/grafana/dskit/services"
"github.com/grafana/tempo/pkg/cache"
)

type mockClient struct {
client map[string][]byte
sync.Mutex
cache map[string][]byte
}

func (m *mockClient) Store(_ context.Context, key []string, val [][]byte) {
m.client[key[0]] = val[0]
func (m *mockClient) Store(_ context.Context, keys []string, bufs [][]byte) {
m.Lock()
defer m.Unlock()
for i := range keys {
m.cache[keys[i]] = bufs[i]
}
}

func (m *mockClient) Fetch(_ context.Context, key []string) (found []string, bufs [][]byte, missing []string) {
val, ok := m.client[key[0]]
if ok {
found = append(found, key[0])
bufs = append(bufs, val)
} else {
missing = append(missing, key[0])
func (m *mockClient) Fetch(_ context.Context, keys []string) (found []string, bufs [][]byte, missing []string) {
m.Lock()
defer m.Unlock()
for _, key := range keys {
buf, ok := m.cache[key]
if ok {
found = append(found, key)
bufs = append(bufs, buf)
} else {
missing = append(missing, key)
}
}
return
}

func (m *mockClient) MaxItemSize() int {
return 0
}

func (m *mockClient) Stop() {
}

// NewMockClient makes a new mockClient.
func NewMockClient() cache.Cache {
return &mockClient{
client: map[string][]byte{},
cache: map[string][]byte{},
}
}

Expand Down