Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GetMetrics use second pass #2765

Merged
merged 11 commits into from
Aug 9, 2023
35 changes: 19 additions & 16 deletions pkg/parquetquery/iters.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,12 +471,15 @@ var columnIteratorResultPool = sync.Pool{
},
}

func columnIteratorResultPoolGet() *IteratorResult {
// GetResult buffer struct from the internal memory pool. Should be
// released by calling ReleaseResult() when done.
func GetResult() *IteratorResult {
res := columnIteratorResultPool.Get().(*IteratorResult)
return res
}

func columnIteratorResultPoolPut(r *IteratorResult) {
// ReleaseResult returns the buffer struct back to the internal memory pool.
func ReleaseResult(r *IteratorResult) {
if r != nil {
r.Reset()
columnIteratorResultPool.Put(r)
Expand Down Expand Up @@ -837,7 +840,7 @@ func (c *SyncIterator) closeCurrRowGroup() {
}

func (c *SyncIterator) makeResult(t RowNumber, v *pq.Value) *IteratorResult {
r := columnIteratorResultPoolGet()
r := GetResult()
r.RowNumber = t
if c.selectAs != "" {
r.AppendValue(c.selectAs, v.Clone())
Expand Down Expand Up @@ -1152,7 +1155,7 @@ func (c *ColumnIterator) SeekTo(to RowNumber, d int) (*IteratorResult, error) {
}

func (c *ColumnIterator) makeResult(t RowNumber, v pq.Value) *IteratorResult {
r := columnIteratorResultPoolGet()
r := GetResult()
r.RowNumber = t
if c.selectAs != "" {
r.AppendValue(c.selectAs, v)
Expand Down Expand Up @@ -1262,7 +1265,7 @@ func (j *JoinIterator) Next() (*IteratorResult, error) {
}

// Result discarded
columnIteratorResultPoolPut(result)
ReleaseResult(result)
}

// Skip all iterators to the highest row seen, it's impossible
Expand All @@ -1287,7 +1290,7 @@ func (j *JoinIterator) seekAll(t RowNumber, d int) error {
t = TruncateRowNumber(d, t)
for iterNum, iter := range j.iters {
if j.peeks[iterNum] == nil || CompareRowNumbers(d, j.peeks[iterNum].RowNumber, t) == -1 {
columnIteratorResultPoolPut(j.peeks[iterNum])
ReleaseResult(j.peeks[iterNum])
j.peeks[iterNum], err = iter.SeekTo(t, d)
if err != nil {
return err
Expand All @@ -1314,15 +1317,15 @@ func (j *JoinIterator) peek(iterNum int) (*IteratorResult, error) {
func (j *JoinIterator) collect(rowNumber RowNumber) (*IteratorResult, error) {
var err error

result := columnIteratorResultPoolGet()
result := GetResult()
result.RowNumber = rowNumber

for i := range j.iters {
for j.peeks[i] != nil && CompareRowNumbers(j.definitionLevel, j.peeks[i].RowNumber, rowNumber) == 0 {

result.Append(j.peeks[i])

columnIteratorResultPoolPut(j.peeks[i])
ReleaseResult(j.peeks[i])

j.peeks[i], err = j.iters[i].Next()
if err != nil {
Expand Down Expand Up @@ -1436,7 +1439,7 @@ func (j *LeftJoinIterator) Next() (*IteratorResult, error) {
}

// Result discarded
columnIteratorResultPoolPut(result)
ReleaseResult(result)
}

// Skip all iterators to the highest row seen, it's impossible
Expand All @@ -1461,7 +1464,7 @@ func (j *LeftJoinIterator) seekAll(t RowNumber, d int) (err error) {
t = TruncateRowNumber(d, t)
for iterNum, iter := range j.required {
if j.peeksRequired[iterNum] == nil || CompareRowNumbers(d, j.peeksRequired[iterNum].RowNumber, t) == -1 {
columnIteratorResultPoolPut(j.peeksRequired[iterNum])
ReleaseResult(j.peeksRequired[iterNum])
j.peeksRequired[iterNum], err = iter.SeekTo(t, d)
if err != nil {
return
Expand All @@ -1470,7 +1473,7 @@ func (j *LeftJoinIterator) seekAll(t RowNumber, d int) (err error) {
}
for iterNum, iter := range j.optional {
if j.peeksOptional[iterNum] == nil || CompareRowNumbers(d, j.peeksOptional[iterNum].RowNumber, t) == -1 {
columnIteratorResultPoolPut(j.peeksOptional[iterNum])
ReleaseResult(j.peeksOptional[iterNum])
j.peeksOptional[iterNum], err = iter.SeekTo(t, d)
if err != nil {
return
Expand All @@ -1496,15 +1499,15 @@ func (j *LeftJoinIterator) peek(iterNum int) (*IteratorResult, error) {
// or are exhausted.
func (j *LeftJoinIterator) collect(rowNumber RowNumber) (*IteratorResult, error) {
var err error
result := columnIteratorResultPoolGet()
result := GetResult()
result.RowNumber = rowNumber

collect := func(iters []Iterator, peeks []*IteratorResult) {
for i := range iters {
// Collect matches
for peeks[i] != nil && CompareRowNumbers(j.definitionLevel, peeks[i].RowNumber, rowNumber) == 0 {
result.Append(peeks[i])
columnIteratorResultPoolPut(peeks[i])
ReleaseResult(peeks[i])
peeks[i], err = iters[i].Next()
if err != nil {
return
Expand Down Expand Up @@ -1616,7 +1619,7 @@ func (u *UnionIterator) Next() (*IteratorResult, error) {
// from at least one iterator, or all are exhausted
if len(u.lowestIters) > 0 {
if u.pred != nil && !u.pred.KeepGroup(result) {
columnIteratorResultPoolPut(result)
ReleaseResult(result)
continue
}

Expand Down Expand Up @@ -1659,15 +1662,15 @@ func (u *UnionIterator) peek(iterNum int) (*IteratorResult, error) {
func (u *UnionIterator) collect(iterNums []int, rowNumber RowNumber) (*IteratorResult, error) {
var err error

result := columnIteratorResultPoolGet()
result := GetResult()
result.RowNumber = rowNumber

for _, iterNum := range iterNums {
for u.peeks[iterNum] != nil && CompareRowNumbers(u.definitionLevel, u.peeks[iterNum].RowNumber, rowNumber) == 0 {

result.Append(u.peeks[iterNum])

columnIteratorResultPoolPut(u.peeks[iterNum])
ReleaseResult(u.peeks[iterNum])

u.peeks[iterNum], err = u.iters[iterNum].Next()
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions pkg/traceql/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,26 @@ type Spanset struct {
StartTimeUnixNanos uint64
DurationNanos uint64
Attributes []*SpansetAttribute

// Set this function to provide upstream callers with a method to
// release this spanset and all its spans when finished. This method will be
// called with the spanset itself as the argument. This is done for a worthwhile
// memory savings as the same function pointer can then be reused across spansets.
ReleaseFn func(*Spanset)
}

func (s *Spanset) AddAttribute(key string, value Static) {
s.Attributes = append(s.Attributes, &SpansetAttribute{Name: key, Val: value})
}

// Release the spanset and all its span. This is just a wrapper of ReleaseFn that
// performs nil checks.
func (s *Spanset) Release() {
if s.ReleaseFn != nil {
s.ReleaseFn(s)
}
}

func (s *Spanset) clone() *Spanset {
ss := *s
return &ss
Expand Down
74 changes: 32 additions & 42 deletions pkg/traceqlmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,63 +229,24 @@ func GetMetrics(ctx context.Context, query string, groupBy string, spanLimit int
}

// Ensure that we select the span duration, status, and group-by attributes
// if they are not already included in the query.
// in the second pass if they are not already part of the first pass.
addConditionIfNotPresent := func(a traceql.Attribute) {
for _, c := range req.Conditions {
if c.Attribute == a {
return
}
}

req.Conditions = append(req.Conditions, traceql.Condition{Attribute: a})
req.SecondPassConditions = append(req.SecondPassConditions, traceql.Condition{Attribute: a})
}
addConditionIfNotPresent(status)
addConditionIfNotPresent(duration)
for _, g := range groupBys {
addConditionIfNotPresent(g[0])
}

// Read the spans in the second pass callback and return nil to discard them.
// We do this because it lets the fetch layer repool the spans because it
// knows we discarded them.
// TODO - Add span.Release() or something that we could use in the loop
// at the bottom to repool the spans?
req.SecondPass = func(s *traceql.Spanset) ([]*traceql.Spanset, error) {
out, err := eval([]*traceql.Spanset{s})
if err != nil {
return nil, err
}

for _, ss := range out {
for _, s := range ss.Spans {

if start > 0 && s.StartTimeUnixNanos() < start {
continue
}
if end > 0 && s.StartTimeUnixNanos() >= end {
continue
}

var (
attrs = s.Attributes()
series = MetricSeries{}
err = attrs[status] == statusErr
)

for i, g := range groupBys {
series[i] = KeyValue{Key: groupByKeys[i], Value: lookup(g, attrs)}
}

results.Record(series, s.DurationNanos(), err)

spanCount++
if spanLimit > 0 && spanCount >= spanLimit {
return nil, io.EOF
}
}
}

return nil, err
return eval([]*traceql.Spanset{s})
}

// Perform the fetch and process the results inside the SecondPass
Expand All @@ -309,6 +270,35 @@ func GetMetrics(ctx context.Context, query string, groupBy string, spanLimit int
if ss == nil {
break
}

for _, s := range ss.Spans {

if start > 0 && s.StartTimeUnixNanos() < start {
continue
}
if end > 0 && s.StartTimeUnixNanos() >= end {
continue
}

var (
attrs = s.Attributes()
series = MetricSeries{}
err = attrs[status] == statusErr
)

for i, g := range groupBys {
series[i] = KeyValue{Key: groupByKeys[i], Value: lookup(g, attrs)}
}

results.Record(series, s.DurationNanos(), err)

spanCount++
if spanLimit > 0 && spanCount >= spanLimit {
return nil, io.EOF
}
}

ss.Release()
}

// The results are estimated if we bailed early due to limit being reached, but only if spanLimit has been set.
Expand Down
38 changes: 30 additions & 8 deletions tempodb/encoding/vparquet2/block_traceql.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,17 @@ func getSpan() *span {
return spanPool.Get().(*span)
}

var spansetPool = sync.Pool{
New: func() interface{} {
return &traceql.Spanset{}
},
}
var spansetPool = sync.Pool{}

func getSpanset() *traceql.Spanset {
return spansetPool.Get().(*traceql.Spanset)
ss := spansetPool.Get()
if ss == nil {
return &traceql.Spanset{
ReleaseFn: putSpansetAndSpans,
}
}

return ss.(*traceql.Spanset)
}

// putSpanset back into the pool. Does not repool the spans.
Expand All @@ -182,6 +185,17 @@ func putSpanset(ss *traceql.Spanset) {
spansetPool.Put(ss)
}

func putSpansetAndSpans(ss *traceql.Spanset) {
if ss != nil {
for _, s := range ss.Spans {
if span, ok := s.(*span); ok {
putSpan(span)
}
}
putSpanset(ss)
}
}

// Helper function to create an iterator, that abstracts away
// context like file and rowgroups.
type makeIterFn func(columnName string, predicate parquetquery.Predicate, selectAs string) parquetquery.Iterator
Expand Down Expand Up @@ -428,6 +442,8 @@ func (i *bridgeIterator) Next() (*parquetquery.IteratorResult, error) {
}
}

parquetquery.ReleaseResult(res)

sort.Slice(i.nextSpans, func(j, k int) bool {
return parquetquery.CompareRowNumbers(DefinitionLevelResourceSpans, i.nextSpans[j].rowNum, i.nextSpans[k].rowNum) == -1
})
Expand All @@ -442,7 +458,8 @@ func (i *bridgeIterator) Next() (*parquetquery.IteratorResult, error) {
}

func spanToIteratorResult(s *span) *parquetquery.IteratorResult {
res := &parquetquery.IteratorResult{RowNumber: s.rowNum}
res := parquetquery.GetResult()
res.RowNumber = s.rowNum
res.AppendOtherValue(otherEntrySpanKey, s)

return res
Expand Down Expand Up @@ -547,6 +564,9 @@ func (i *rebatchIterator) Next() (*parquetquery.IteratorResult, error) {
i.nextSpans = append(i.nextSpans, sp)
}

parquetquery.ReleaseResult(res)
putSpanset(ss) // Repool the spanset but not the spans which have been moved to nextSpans as needed.

res = i.resultFromNextSpans()
if res != nil {
return res, nil
Expand All @@ -561,7 +581,7 @@ func (i *rebatchIterator) resultFromNextSpans() *parquetquery.IteratorResult {
i.nextSpans = i.nextSpans[1:]

if ret.cbSpansetFinal && ret.cbSpanset != nil {
res := &parquetquery.IteratorResult{}
res := parquetquery.GetResult()
res.AppendOtherValue(otherEntrySpansetKey, ret.cbSpanset)
return res
}
Expand Down Expand Up @@ -601,6 +621,8 @@ func (i *spansetIterator) Next(context.Context) (*traceql.Spanset, error) {
return nil, nil
}

defer parquetquery.ReleaseResult(res)

// The spanset is in the OtherEntries
iface := res.OtherValueFromKey(otherEntrySpansetKey)
if iface == nil {
Expand Down
1 change: 1 addition & 0 deletions tempodb/encoding/vparquet2/block_traceql_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func TestBackendBlockSearchFetchMetaData(t *testing.T) {
sp.(*span).cbSpansetFinal = false
sp.(*span).rowNum = parquetquery.RowNumber{}
}
s.ReleaseFn = nil
}

require.Equal(t, tc.expectedResults, ss, "search request:", req)
Expand Down
1 change: 1 addition & 0 deletions tempodb/encoding/vparquet2/block_traceql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ func BenchmarkBackendBlockGetMetrics(b *testing.B) {
rr := backend.NewReader(r)
meta, err := rr.BlockMeta(ctx, blockID, tenantID)
require.NoError(b, err)
require.Equal(b, VersionString, meta.Version)

opts := common.DefaultSearchOptions()
opts.StartPage = 10
Expand Down
Loading
Loading