Skip to content

Commit

Permalink
Add search block headers for wal blocks (#963)
Browse files Browse the repository at this point in the history
* Add search block headers for wal blocks

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* changelog

Signed-off-by: Martin Disibio <mdisibio@gmail.com>
  • Loading branch information
mdisibio committed Sep 21, 2021
1 parent 4def3a0 commit af0f76f
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 85 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [ENHANCEMENT] Add status endpoint to list the available endpoints [#938](https://github.com/grafana/tempo/pull/938) (@zalegrala)
* [ENHANCEMENT] Compression updates: Added s2, improved snappy performance [#961](https://github.com/grafana/tempo/pull/961) (@joe-elliott)
* [ENHANCEMENT] Add search block headers [#943](https://github.com/grafana/tempo/pull/943) (@mdisibio)
* [ENHANCEMENT] Add search block headers for wal blocks [#963](https://github.com/grafana/tempo/pull/963) (@mdisibio)
* [CHANGE] Renamed CLI flag from `--storage.trace.maintenance-cycle` to `--storage.trace.blocklist_poll`. This is a **breaking change** [#897](https://github.com/grafana/tempo/pull/897) (@mritunjaysharma394)
* [CHANGE] update jsonnet alerts and recording rules to use `job_selectors` and `cluster_selectors` for configurable unique identifier labels [#935](https://github.com/grafana/tempo/pull/935) (@kevinschoonover)
* [CHANGE] Modify generated tag keys in Vulture for easier filtering [#934](https://github.com/grafana/tempo/pull/934) (@zalegrala)
Expand Down
44 changes: 37 additions & 7 deletions pkg/tempofb/SearchBlockHeader_util.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
package tempofb

import flatbuffers "github.com/google/flatbuffers/go"
import (
"strings"

type SearchBlockHeaderBuilder struct {
flatbuffers "github.com/google/flatbuffers/go"
)

func (s *SearchBlockHeader) Contains(k []byte, v []byte, buffer *KeyValues) bool {
return ContainsTag(s, buffer, k, v)
}

type SearchBlockHeaderMutable struct {
Tags SearchDataMap
MinDur uint64
MaxDur uint64
}

func NewSearchBlockHeaderBuilder() *SearchBlockHeaderBuilder {
return &SearchBlockHeaderBuilder{
func NewSearchBlockHeaderMutable() *SearchBlockHeaderMutable {
return &SearchBlockHeaderMutable{
Tags: SearchDataMap{},
}
}

func (s *SearchBlockHeaderBuilder) AddEntry(e *SearchEntry) {
func (s *SearchBlockHeaderMutable) AddEntry(e *SearchEntry) {

kv := &KeyValues{} //buffer

Expand All @@ -37,11 +45,33 @@ func (s *SearchBlockHeaderBuilder) AddEntry(e *SearchEntry) {
}

// AddTag adds the unique tag name and value to the search data. No effect if the pair is already present.
func (s *SearchBlockHeaderBuilder) AddTag(k string, v string) {
func (s *SearchBlockHeaderMutable) AddTag(k string, v string) {
s.Tags.Add(k, v)
}

func (s *SearchBlockHeaderBuilder) ToBytes() []byte {
func (s *SearchBlockHeaderMutable) MinDurationNanos() uint64 {
return s.MinDur
}

func (s *SearchBlockHeaderMutable) MaxDurationNanos() uint64 {
return s.MaxDur
}

func (s *SearchBlockHeaderMutable) Contains(k []byte, v []byte, _ *KeyValues) bool {
e := s.Tags[string(k)]
if e != nil {
vv := string(v)
for _, s := range e {
if strings.Contains(s, vv) {
return true
}
}
}

return false
}

func (s *SearchBlockHeaderMutable) ToBytes() []byte {
b := flatbuffers.NewBuilder(1024)

tags := s.Tags.WriteToBuilder(b)
Expand Down
5 changes: 5 additions & 0 deletions pkg/tempofb/SearchPage_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package tempofb

func (s *SearchPage) Contains(k []byte, v []byte, buffer *KeyValues) bool {
return ContainsTag(s, buffer, k, v)
}
51 changes: 8 additions & 43 deletions pkg/tempofb/searchdata_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,6 @@ import (
"github.com/grafana/tempo/tempodb/encoding/common"
)

// TagContainer is anything with KeyValues (tags). This is implemented by both
// SearchPage and SearchEntry.
type TagContainer interface {
Tags(obj *KeyValues, j int) bool
TagsLength() int
}

var _ TagContainer = (*SearchPage)(nil)
var _ TagContainer = (*SearchEntry)(nil)

type SearchDataMap map[string][]string

func (s SearchDataMap) Add(k, v string) {
Expand Down Expand Up @@ -219,45 +209,20 @@ func (s *SearchEntry) Get(k string) string {
// Buffer KeyValue object can be passed to reduce allocations. Key and value must be
// already converted to byte slices which match the nature of the flatbuffer data
// which reduces allocations even further.
func (s *SearchEntry) Contains(kv *KeyValues, k []byte, v []byte) bool {

matched := -1

// Binary search for keys. Flatbuffers are written backwards so
// keys are descending (the comparison is reversed).
// TODO - We only want exact matches, sort.Search has to make an
// extra comparison. We should fork it to make use of the full
// tri-state response from bytes.Compare
sort.Search(s.TagsLength(), func(i int) bool {
s.Tags(kv, i)
comparison := bytes.Compare(k, kv.Key())
if comparison == 0 {
matched = i
// TODO it'd be great to exit here and retain the data in kv buffer
}
return comparison >= 0
})

if matched >= 0 {
s.Tags(kv, matched)

// Linear search for matching values
l := kv.ValueLength()
for j := 0; j < l; j++ {
if bytes.Contains(kv.Value(j), v) {
return true
}
}
}

return false
func (s *SearchEntry) Contains(k []byte, v []byte, buffer *KeyValues) bool {
return ContainsTag(s, buffer, k, v)
}

func SearchEntryFromBytes(b []byte) *SearchEntry {
return GetRootAsSearchEntry(b, 0)
}

func ContainsTag(s TagContainer, kv *KeyValues, k []byte, v []byte) bool {
type FBTagContainer interface {
Tags(obj *KeyValues, j int) bool
TagsLength() int
}

func ContainsTag(s FBTagContainer, kv *KeyValues, k []byte, v []byte) bool {

matched := -1

Expand Down
30 changes: 30 additions & 0 deletions pkg/tempofb/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package tempofb

// TagContainer is anything with KeyValues (tags). This is implemented by both
// SearchPage and SearchEntry.
type TagContainer interface {
Contains(k []byte, v []byte, buffer *KeyValues) bool
}

type Trace interface {
TagContainer
StartTimeUnixNano() uint64
EndTimeUnixNano() uint64
}

var _ Trace = (*SearchEntry)(nil)

type Page interface {
TagContainer
}

var _ Page = (*SearchPage)(nil)

type Block interface {
TagContainer
MinDurationNanos() uint64
MaxDurationNanos() uint64
}

var _ Block = (*SearchBlockHeader)(nil)
var _ Block = (*SearchBlockHeaderMutable)(nil)
2 changes: 1 addition & 1 deletion tempodb/search/backend_search_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, l *local.Backend, blockI
pageSizeBytes = defaultBackendSearchBlockPageSize
}

header := tempofb.NewSearchBlockHeaderBuilder()
header := tempofb.NewSearchBlockHeaderMutable()

w, err := newBackendSearchBlockWriter(blockID, tenantID, l, version, enc)
if err != nil {
Expand Down
24 changes: 12 additions & 12 deletions tempodb/search/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (

const SecretExhaustiveSearchTag = "x-dbg-exhaustive"

type tracefilter func(entry *tempofb.SearchEntry) (matches bool)
type tracefilter func(entry tempofb.Trace) (matches bool)
type tagfilter func(page tempofb.TagContainer) (matches bool)
type blockfilter func(header *tempofb.SearchBlockHeader) (matches bool)
type blockfilter func(header tempofb.Block) (matches bool)

type Pipeline struct {
blockfilters []blockfilter
Expand All @@ -26,13 +26,13 @@ func NewSearchPipeline(req *tempopb.SearchRequest) Pipeline {
if req.MinDurationMs > 0 {
minDurationNanos := uint64(time.Duration(req.MinDurationMs) * time.Millisecond)

p.tracefilters = append(p.tracefilters, func(s *tempofb.SearchEntry) bool {
p.tracefilters = append(p.tracefilters, func(s tempofb.Trace) bool {
et := s.EndTimeUnixNano()
st := s.StartTimeUnixNano()
return (et - st) >= minDurationNanos
})

p.blockfilters = append(p.blockfilters, func(s *tempofb.SearchBlockHeader) bool {
p.blockfilters = append(p.blockfilters, func(s tempofb.Block) bool {
max := s.MaxDurationNanos()
return max >= minDurationNanos
})
Expand All @@ -41,13 +41,13 @@ func NewSearchPipeline(req *tempopb.SearchRequest) Pipeline {
if req.MaxDurationMs > 0 {
maxDurationNanos := uint64(time.Duration(req.MaxDurationMs) * time.Millisecond)

p.tracefilters = append(p.tracefilters, func(s *tempofb.SearchEntry) bool {
p.tracefilters = append(p.tracefilters, func(s tempofb.Trace) bool {
et := s.EndTimeUnixNano()
st := s.StartTimeUnixNano()
return (et - st) <= maxDurationNanos
})

p.blockfilters = append(p.blockfilters, func(s *tempofb.SearchBlockHeader) bool {
p.blockfilters = append(p.blockfilters, func(s tempofb.Block) bool {
min := s.MinDurationNanos()
return min <= maxDurationNanos
})
Expand All @@ -64,7 +64,7 @@ func NewSearchPipeline(req *tempopb.SearchRequest) Pipeline {
// * no block or page filters means all blocks and pages match
// * substitute this trace filter instead rejects everything. therefore it never
// quits early due to enough results
p.tracefilters = append(p.tracefilters, func(s *tempofb.SearchEntry) bool {
p.tracefilters = append(p.tracefilters, func(s tempofb.Trace) bool {
return false
})
continue
Expand All @@ -75,12 +75,12 @@ func NewSearchPipeline(req *tempopb.SearchRequest) Pipeline {
}

p.tagfilters = append(p.tagfilters, func(s tempofb.TagContainer) bool {
// Buffer is allocated here so function is thread-safe
// Buffer is allocated here so pipeline can be used concurrently.
buffer := &tempofb.KeyValues{}

// Must match all
for i := range kb {
if !tempofb.ContainsTag(s, buffer, kb[i], vb[i]) {
if !s.Contains(kb[i], vb[i], buffer) {
return false
}
}
Expand All @@ -91,7 +91,7 @@ func NewSearchPipeline(req *tempopb.SearchRequest) Pipeline {
return p
}

func (p *Pipeline) Matches(e *tempofb.SearchEntry) bool {
func (p *Pipeline) Matches(e tempofb.Trace) bool {

for _, f := range p.tracefilters {
if !f(e) {
Expand All @@ -109,7 +109,7 @@ func (p *Pipeline) Matches(e *tempofb.SearchEntry) bool {
}

// nolint:interfacer
func (p *Pipeline) MatchesPage(pg *tempofb.SearchPage) bool {
func (p *Pipeline) MatchesPage(pg tempofb.Page) bool {
for _, f := range p.tagfilters {
if !f(pg) {
return false
Expand All @@ -119,7 +119,7 @@ func (p *Pipeline) MatchesPage(pg *tempofb.SearchPage) bool {
return true
}

func (p *Pipeline) MatchesBlock(block *tempofb.SearchBlockHeader) bool {
func (p *Pipeline) MatchesBlock(block tempofb.Block) bool {
for _, f := range p.blockfilters {
if !f(block) {
return false
Expand Down
2 changes: 1 addition & 1 deletion tempodb/search/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestPipelineMatchesTraceDuration(t *testing.T) {
func TestPipelineMatchesBlock(t *testing.T) {

// Run all tests against this header
commonBlock := tempofb.NewSearchBlockHeaderBuilder()
commonBlock := tempofb.NewSearchBlockHeaderMutable()
commonBlock.AddTag("tag", "value")
commonBlock.MinDur = uint64(1 * time.Second)
commonBlock.MaxDur = uint64(10 * time.Second)
Expand Down
16 changes: 15 additions & 1 deletion tempodb/search/streaming_search_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type StreamingSearchBlock struct {
appender encoding.Appender
file *os.File
bytesWritten int
header *tempofb.SearchBlockHeaderMutable
}

// Clear deletes the files for this block.
Expand Down Expand Up @@ -56,7 +57,8 @@ func (s *StreamingSearchBlock) Write(id common.ID, obj []byte) (int, error) {
// File must be opened for read/write permissions.
func NewStreamingSearchBlockForFile(f *os.File) (*StreamingSearchBlock, error) {
s := &StreamingSearchBlock{
file: f,
file: f,
header: tempofb.NewSearchBlockHeaderMutable(),
}

// Entries are not paged, use non paged appender.
Expand All @@ -70,12 +72,24 @@ func NewStreamingSearchBlockForFile(f *os.File) (*StreamingSearchBlock, error) {
// the same trace can be passed and are merged into one entry.
func (s *StreamingSearchBlock) Append(ctx context.Context, id common.ID, searchData [][]byte) error {
combined, _ := staticCombiner.Combine("", searchData...)

if len(combined) <= 0 {
return nil
}

s.header.AddEntry(tempofb.SearchEntryFromBytes(combined))

return s.appender.Append(id, combined)
}

// Search the streaming block.
func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results) error {

if !p.MatchesBlock(s.header) {
sr.AddBlockSkipped()
return nil
}

var buf []byte

sr.AddBlockInspected()
Expand Down
Loading

0 comments on commit af0f76f

Please sign in to comment.