Skip to content

Commit

Permalink
Dedupe search records while replaying WAL (#940)
Browse files Browse the repository at this point in the history
* Checkpoint: Add first version of search data combiner

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Make combine function variadic

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Remove unused code, reuse buffer in SearchDataReader

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Add Search data iterator

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Guard code, todo comment

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* CHANGELOG

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* fix test

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Reuse buffer

Signed-off-by: Annanay <annanayagarwal@gmail.com>
  • Loading branch information
annanay25 committed Sep 10, 2021
1 parent 6221788 commit ce0b23a
Show file tree
Hide file tree
Showing 15 changed files with 185 additions and 80 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
* [ENHANCEMENT] Include additional detail when searching for traces [#916](https://github.com/grafana/tempo/pull/916) (@zalegrala)
* [ENHANCEMENT] Add `gen index` and `gen bloom` commands to tempo-cli. [#903](https://github.com/grafana/tempo/pull/903) (@annanay25)
* [ENHANCEMENT] Implement trace comparison in Vulture [#904](https://github.com/grafana/tempo/pull/904) (@zalegrala)
* [ENHANCEMENT] Dedupe search records while replaying WAL [#940](https://github.com/grafana/tempo/pull/940) (@annanay25)
* [CHANGE] Renamed CLI flag from `--storage.trace.maintenance-cycle` to `--storage.trace.blocklist_poll`. This is a **breaking change** [#897](https://github.com/grafana/tempo/pull/897) (@mritunjaysharma394)
* [CHANGE] update jsonnet alerts and recording rules to use `job_selectors` and `cluster_selectors` for configurable unique identifier labels [#935](https://github.com/grafana/tempo/pull/935) (@kevinschoonover)
* [CHANGE] Modify generated tag keys in Vulture for easier filtering [#934](https://github.com/grafana/tempo/pull/934) (@zalegrala)
Expand Down
4 changes: 2 additions & 2 deletions modules/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ func (c *Compactor) Owns(hash string) bool {
}

// Combine implements common.ObjectCombiner
func (c *Compactor) Combine(objA []byte, objB []byte, dataEncoding string) ([]byte, bool) {
return model.ObjectCombiner.Combine(objA, objB, dataEncoding)
func (c *Compactor) Combine(dataEncoding string, objs ...[]byte) ([]byte, bool) {
return model.ObjectCombiner.Combine(dataEncoding, objs...)
}

// BlockRetentionForTenant implements CompactorOverrides
Expand Down
7 changes: 5 additions & 2 deletions modules/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ func (m *mockSharder) Owns(hash string) bool {
return true
}

func (m *mockSharder) Combine(objA []byte, objB []byte, dataEncoding string) ([]byte, bool) {
combined, wasCombined, _ := model.CombineTraceBytes(objA, objB, dataEncoding, dataEncoding)
func (m *mockSharder) Combine(dataEncoding string, objs ...[]byte) ([]byte, bool) {
if len(objs) != 2 {
return nil, false
}
combined, wasCombined, _ := model.CombineTraceBytes(objs[0], objs[1], dataEncoding, dataEncoding)
return combined, wasCombined
}

Expand Down
29 changes: 25 additions & 4 deletions pkg/model/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"hash"
"hash/fnv"

"github.com/grafana/tempo/tempodb/encoding/common"

"github.com/go-kit/kit/log/level"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/pkg/errors"
Expand All @@ -18,12 +20,31 @@ type objectCombiner struct{}

var ObjectCombiner = objectCombiner{}

var _ common.ObjectCombiner = (*objectCombiner)(nil)

// Combine implements tempodb/encoding/common.ObjectCombiner
func (o objectCombiner) Combine(objA []byte, objB []byte, dataEncoding string) ([]byte, bool) {
combinedTrace, wasCombined, err := CombineTraceBytes(objA, objB, dataEncoding, dataEncoding)
if err != nil {
level.Error(log.Logger).Log("msg", "error combining trace protos", "err", err.Error())
func (o objectCombiner) Combine(dataEncoding string, objs ...[]byte) ([]byte, bool) {
if len(objs) <= 0 {
return nil, false
}

if len(objs) == 1 {
return objs[0], false
}

combinedTrace := objs[0]
var wasCombined bool
var err error
for _, obj := range objs[1:] {
// Todo: Find an efficient way to combine all objs in a single step
// However, this is ok for now because Combine() is never called with len(objs) > 2
combinedTrace, wasCombined, err = CombineTraceBytes(combinedTrace, obj, dataEncoding, dataEncoding)
if err != nil {
level.Error(log.Logger).Log("msg", "error combining trace protos", "err", err.Error())
break
}
}

return combinedTrace, wasCombined
}

Expand Down
4 changes: 2 additions & 2 deletions tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ type instrumentedObjectCombiner struct {
}

// Combine wraps the inner combiner with combined metrics
func (i instrumentedObjectCombiner) Combine(objA []byte, objB []byte, dataEncoding string) ([]byte, bool) {
b, wasCombined := i.inner.Combine(objA, objB, dataEncoding)
func (i instrumentedObjectCombiner) Combine(dataEncoding string, objs ...[]byte) ([]byte, bool) {
b, wasCombined := i.inner.Combine(dataEncoding, objs...)
if wasCombined {
metricCompactionObjectsCombined.WithLabelValues(i.compactionLevelLabel).Inc()
}
Expand Down
18 changes: 11 additions & 7 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,22 @@ func (m *mockSharder) Owns(hash string) bool {
return true
}

type mockJobSharder struct{}

func (m *mockJobSharder) Owns(_ string) bool { return true }
func (m *mockSharder) Combine(dataEncoding string, objs ...[]byte) ([]byte, bool) {
if len(objs) != 2 {
return nil, false
}

func (m *mockSharder) Combine(objA []byte, objB []byte, dataEncoding string) ([]byte, bool) {
if len(objA) > len(objB) {
return objA, true
if len(objs[0]) > len(objs[1]) {
return objs[0], true
}

return objB, true
return objs[1], true
}

type mockJobSharder struct{}

func (m *mockJobSharder) Owns(_ string) bool { return true }

type mockOverrides struct {
blockRetention time.Duration
}
Expand Down
4 changes: 2 additions & 2 deletions tempodb/encoding/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ type Record struct {

// ObjectCombiner is used to combine two objects in the backend
type ObjectCombiner interface {
// Combine objA and objB encoded using dataEncoding. The returned object must
// Combine objects encoded using dataEncoding. The returned object must
// use the same dataEncoding. Returns a bool indicating if it the objects required combining and
// the combined slice
Combine(objA []byte, objB []byte, dataEncoding string) ([]byte, bool)
Combine(dataEncoding string, objs ...[]byte) ([]byte, bool)
}

// DataReader returns a slice of pages in the encoding/v0 format referenced by
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/finder_paged.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (f *pagedFinder) Find(ctx context.Context, id common.ID) ([]byte, error) {
break
}

bytesFound, _ = f.combiner.Combine(bytesFound, bytesOne, f.dataEncoding)
bytesFound, _ = f.combiner.Combine(f.dataEncoding, bytesFound, bytesOne)

// we need to check the next record to see if it also matches our id
i++
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/iterator_deduping.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (i *dedupingIterator) Next(ctx context.Context) (common.ID, []byte, error)
}

i.currentID = id
i.currentObject, _ = i.combiner.Combine(i.currentObject, obj, i.dataEncoding)
i.currentObject, _ = i.combiner.Combine(i.dataEncoding, i.currentObject, obj)
}

return dedupedID, dedupedObject, nil
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/iterator_multiblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (i *multiblockIterator) iterate(ctx context.Context) {
comparison := bytes.Compare(currentID, lowestID)

if comparison == 0 {
lowestObject, _ = i.combiner.Combine(currentObject, lowestObject, i.dataEncoding)
lowestObject, _ = i.combiner.Combine(i.dataEncoding, currentObject, lowestObject)
b.clear()
} else if len(lowestID) == 0 || comparison == -1 {
lowestID = currentID
Expand Down
117 changes: 105 additions & 12 deletions tempodb/search/backend_search_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package search
import (
"bytes"
"context"
"io"
"os"

"github.com/google/uuid"
"github.com/pkg/errors"

"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/tempodb/backend"
Expand All @@ -23,6 +26,78 @@ type BackendSearchBlock struct {
l *local.Backend
}

//nolint:golint
type SearchDataCombiner struct{}

func (*SearchDataCombiner) Combine(_ string, searchData ...[]byte) ([]byte, bool) {

if len(searchData) <= 0 {
return nil, false
}

if len(searchData) == 1 {
return searchData[0], false
}

// Squash all datas into 1
data := tempofb.SearchEntryMutable{}
kv := &tempofb.KeyValues{} // buffer
for _, sb := range searchData {
sd := tempofb.SearchEntryFromBytes(sb)
for i := 0; i < sd.TagsLength(); i++ {
sd.Tags(kv, i)
for j := 0; j < kv.ValueLength(); j++ {
data.AddTag(string(kv.Key()), string(kv.Value(j)))
}
}
data.SetStartTimeUnixNano(sd.StartTimeUnixNano())
data.SetEndTimeUnixNano(sd.EndTimeUnixNano())
data.TraceID = sd.Id()
}

return data.ToBytes(), true
}

var _ common.ObjectCombiner = (*SearchDataCombiner)(nil)

//nolint:golint
type SearchDataIterator struct {
currentIndex int
records []common.Record
file *os.File

buffer []byte
}

func (s *SearchDataIterator) Next(_ context.Context) (common.ID, []byte, error) {
if s.currentIndex >= len(s.records) {
return nil, nil, io.EOF
}

currentRecord := s.records[s.currentIndex]

// resize/extend buffer
if cap(s.buffer) < int(currentRecord.Length) {
s.buffer = make([]byte, currentRecord.Length)
}
s.buffer = s.buffer[:currentRecord.Length]

_, err := s.file.ReadAt(s.buffer, int64(currentRecord.Start))
if err != nil {
return nil, nil, errors.Wrap(err, "error reading search file")
}

s.currentIndex++

return currentRecord.ID, s.buffer, nil
}

func (*SearchDataIterator) Close() {
// file will be closed by StreamingSearchBlock
}

var _ encoding.Iterator = (*SearchDataIterator)(nil)

// NewBackendSearchBlock iterates through the given WAL search data and writes it to the persistent backend
// in a more efficient paged form. Multiple traces are written in the same page to make sure of the flatbuffer
// CreateSharedString feature which dedupes strings across the entire buffer.
Expand All @@ -42,25 +117,43 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, l *local.Backend, blockI
pageSizeBytes = defaultBackendSearchBlockPageSize
}

// Copy records into the appender
w, err := newBackendSearchBlockWriter(blockID, tenantID, l, version, enc)
if err != nil {
return err
}

// set up deduping iterator for streaming search block
combiner := &SearchDataCombiner{}
searchIterator := &SearchDataIterator{
records: input.appender.Records(),
file: input.file,
}
iter, err := encoding.NewDedupingIterator(searchIterator, combiner, "")
if err != nil {
return errors.Wrap(err, "error creating deduping iterator")
}
a := encoding.NewBufferedAppenderGeneric(w, pageSizeBytes)
for _, r := range input.appender.Records() {

// Copy records into the appender
for {

// Read
buf := make([]byte, r.Length)
_, err = input.file.ReadAt(buf, int64(r.Start))
if err != nil {
return err
id, data, err := iter.Next(ctx)
if err != nil && err != io.EOF {
return errors.Wrap(err, "error iterating")
}

if id == nil {
break
}

if len(data) == 0 {
continue
}

s := tempofb.SearchEntryFromBytes(buf)
data := &tempofb.SearchEntryMutable{
TraceID: r.ID,
s := tempofb.SearchEntryFromBytes(data)
entry := &tempofb.SearchEntryMutable{
TraceID: id,
StartTimeUnixNano: s.StartTimeUnixNano(),
EndTimeUnixNano: s.EndTimeUnixNano(),
}
Expand All @@ -69,13 +162,13 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, l *local.Backend, blockI
for i := 0; i < l; i++ {
s.Tags(kv, i)
for j := 0; j < kv.ValueLength(); j++ {
data.AddTag(string(kv.Key()), string(kv.Value(j)))
entry.AddTag(string(kv.Key()), string(kv.Value(j)))
}
}

err = a.Append(ctx, r.ID, data)
err = a.Append(ctx, id, entry)
if err != nil {
return err
return errors.Wrap(err, "error appending to backend block")
}
}

Expand Down
27 changes: 14 additions & 13 deletions tempodb/search/backend_search_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path"
"strconv"
"sync"
"testing"
"time"
Expand All @@ -18,23 +19,24 @@ import (
"github.com/stretchr/testify/require"
)

func newBackendSearchBlockWithTraces(t testing.TB, traceCount int, enc backend.Encoding, pageSizeBytes int) *BackendSearchBlock {
id := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} // 16-byte ids required
searchData := [][]byte{(&tempofb.SearchEntryMutable{
func genSearchData(traceID []byte, i int) [][]byte {
return [][]byte{(&tempofb.SearchEntryMutable{
TraceID: traceID,
Tags: tempofb.SearchDataMap{
"key1": {"value10", "value11"},
"key2": {"value20", "value21"},
"key3": {"value30", "value31"},
"key4": {"value40", "value41"},
"key" + strconv.Itoa(i): {"value_A_" + strconv.Itoa(i), "value_B_" + strconv.Itoa(i)},
}}).ToBytes()}
}

func newBackendSearchBlockWithTraces(t testing.TB, traceCount int, enc backend.Encoding, pageSizeBytes int) *BackendSearchBlock {
id := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} // 16-byte ids required

f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644)
require.NoError(t, err)

b1, err := NewStreamingSearchBlockForFile(f)
require.NoError(t, err)
for i := 0; i < traceCount; i++ {
assert.NoError(t, b1.Append(context.Background(), id, searchData))
assert.NoError(t, b1.Append(context.Background(), id, genSearchData(id, i)))
}

l, err := local.NewBackend(&local.Config{
Expand All @@ -52,13 +54,12 @@ func newBackendSearchBlockWithTraces(t testing.TB, traceCount int, enc backend.E
}

func TestBackendSearchBlockSearch(t *testing.T) {
traceCount := 50_000
traceCount := 1_000

b2 := newBackendSearchBlockWithTraces(t, traceCount, backend.EncNone, 0)

// Matches every trace
p := NewSearchPipeline(&tempopb.SearchRequest{
Tags: map[string]string{"key1": "value10"},
Tags: map[string]string{"key1": "value_A_1", "key20": "value_B_20"},
})

sr := NewResults()
Expand All @@ -75,8 +76,8 @@ func TestBackendSearchBlockSearch(t *testing.T) {
for r := range sr.Results() {
results = append(results, r)
}
require.Equal(t, traceCount, len(results))
require.Equal(t, traceCount, int(sr.TracesInspected()))
require.Equal(t, 1, len(results))
require.Equal(t, 1, int(sr.TracesInspected()))
}

func BenchmarkBackendSearchBlockSearch(b *testing.B) {
Expand Down
2 changes: 1 addition & 1 deletion tempodb/search/backend_search_block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

// backendSearchBlockWriter is a DataWriter for search data. Instead of receiving bytes slices, it
// receives search data objects and combintes them into a single FlatBuffer Builder and
// receives search data objects and combines them into a single FlatBuffer Builder and
// flushes periodically, one page per flush.
type backendSearchBlockWriter struct {
// input
Expand Down
Loading

0 comments on commit ce0b23a

Please sign in to comment.