Skip to content

Commit

Permalink
[TraceQL Metrics] Step align query_range time range (grafana#3490)
Browse files Browse the repository at this point in the history
* Step align query_range time range

* Time range error: improve message and fix format for prom format.

* oops remove printlns

* lint

* changelog
  • Loading branch information
mdisibio authored and joe-elliott committed Mar 19, 2024
1 parent c45de36 commit ae9f03b
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## main / unreleased

* [FEATURE] Added gRPC streaming endpoints for all tag queries. [#3460](https://github.com/grafana/tempo/pull/3460) (@joe-elliott)
* [CHANGE] Align metrics query time ranges to the step parameter [#3490](https://github.com/grafana/tempo/pull/3490) (@mdisibio)
* [ENHANCEMENT] Add string interning to TraceQL queries [#3411](https://github.com/grafana/tempo/pull/3411) (@mapno)
* [ENHANCEMENT] Add new (unsafe) query hints for metrics queries [#3396](https://github.com/grafana/tempo/pull/3396) (@mdisibio)
* [BUGFIX] Fix metrics query results when filtering and rating on the same attribute [#3428](https://github.com/grafana/tempo/issues/3428) (@mdisibio)
Expand Down
114 changes: 63 additions & 51 deletions modules/frontend/query_range_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
now = time.Unix(now.Unix(), 0)
}

queryRangeReq, err := api.ParseQueryRangeRequest(r)
req, err := api.ParseQueryRangeRequest(r)
if err != nil {
return s.respErrHandler(isProm, err)
}

expr, err := traceql.Parse(queryRangeReq.Query)
expr, err := traceql.Parse(req.Query)
if err != nil {
return s.respErrHandler(isProm, err)
}
Expand All @@ -103,22 +103,23 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
subCtx, subCancel := context.WithCancel(ctx)
defer subCancel()

alignTimeRange(req)

// calculate and enforce max search duration
maxDuration := s.maxDuration(tenantID)
if maxDuration != 0 && time.Duration(queryRangeReq.End-queryRangeReq.Start)*time.Nanosecond > maxDuration {
return &http.Response{
StatusCode: http.StatusBadRequest,
Body: io.NopCloser(strings.NewReader(fmt.Sprintf("range specified by start and end exceeds %s. received start=%d end=%d", maxDuration, queryRangeReq.Start, queryRangeReq.End))),
}, nil
if maxDuration != 0 && time.Duration(req.End-req.Start)*time.Nanosecond > maxDuration {
err = fmt.Errorf(fmt.Sprintf("range specified by start and end (%s) exceeds %s. received start=%d end=%d", time.Duration(req.End-req.Start), maxDuration, req.Start, req.End))
return s.respErrHandler(isProm, err)
}

allowUnsafe := s.overrides.UnsafeQueryHints(tenantID)
samplingRate := s.samplingRate(expr, allowUnsafe)
targetBytesPerRequest := s.jobSize(expr, samplingRate, allowUnsafe)
interval := s.jobInterval(expr, allowUnsafe)

generatorReq = s.generatorRequest(*queryRangeReq, samplingRate)
var (
allowUnsafe = s.overrides.UnsafeQueryHints(tenantID)
samplingRate = s.samplingRate(expr, allowUnsafe)
targetBytesPerRequest = s.jobSize(expr, samplingRate, allowUnsafe)
interval = s.jobInterval(expr, allowUnsafe)
)

generatorReq = s.generatorRequest(*req, now, samplingRate)
reqCh := make(chan *queryRangeJob, 1) // buffer of 1 allows us to insert ingestReq if it exists
stopCh := make(chan struct{})
defer close(stopCh)
Expand All @@ -127,14 +128,16 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
reqCh <- generatorReq
}

totalBlocks, totalBlockBytes := s.backendRequests(tenantID, queryRangeReq, now, samplingRate, targetBytesPerRequest, interval, reqCh, stopCh)
totalBlocks, totalBlockBytes := s.backendRequests(tenantID, *req, now, samplingRate, targetBytesPerRequest, interval, reqCh, stopCh)

wg := boundedwaitgroup.New(uint(s.cfg.ConcurrentRequests))
jobErr := atomic.Error{}
c := traceql.QueryRangeCombiner{}
mtx := sync.Mutex{}
var (
wg = boundedwaitgroup.New(uint(s.cfg.ConcurrentRequests))
jobErr = atomic.Error{}
c = traceql.QueryRangeCombiner{}
mtx = sync.Mutex{}
startedReqs = 0
)

startedReqs := 0
for job := range reqCh {
if job.err != nil {
jobErr.Store(fmt.Errorf("unexpected err building reqs: %w", job.err))
Expand Down Expand Up @@ -163,7 +166,6 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
}

_ = level.Error(s.logger).Log("msg", "error executing sharded query", "url", innerR.RequestURI, "err", err)
// progress.setError(err)
return
}

Expand All @@ -175,7 +177,6 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
}
statusMsg := fmt.Sprintf("upstream: (%d) %s", resp.StatusCode, string(bytesMsg))
jobErr.Store(fmt.Errorf(statusMsg))
/* progress.setStatus(statusCode, statusMsg) */
return
}

Expand All @@ -184,7 +185,6 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
err = (&jsonpb.Unmarshaler{AllowUnknownFields: true}).Unmarshal(resp.Body, results)
if err != nil {
_ = level.Error(s.logger).Log("msg", "error reading response body status == ok", "url", innerR.RequestURI, "err", err)
// progress.setError(err)
return
}

Expand Down Expand Up @@ -212,14 +212,21 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
res.Metrics.TotalBlocks = uint32(totalBlocks)
res.Metrics.TotalBlockBytes = uint64(totalBlockBytes)

// Sort series alphabetically so they are stable in the UI
// Sort all output, series alphabetically, samples by time
sort.SliceStable(res.Series, func(i, j int) bool {
return strings.Compare(res.Series[i].PromLabels, res.Series[j].PromLabels) == -1
})
for _, series := range res.Series {
sort.Slice(series.Samples, func(i, j int) bool {
return series.Samples[i].TimestampMs < series.Samples[j].TimestampMs
})
}

reqTime := time.Since(now)
throughput := math.Round(float64(res.Metrics.InspectedBytes) / reqTime.Seconds())
spanThroughput := math.Round(float64(res.Metrics.InspectedSpans) / reqTime.Seconds())
var (
reqTime = time.Since(now)
throughput = math.Round(float64(res.Metrics.InspectedBytes) / reqTime.Seconds())
spanThroughput = math.Round(float64(res.Metrics.InspectedSpans) / reqTime.Seconds())
)

span.SetTag("totalBlocks", res.Metrics.TotalBlocks)
span.SetTag("inspectedBytes", res.Metrics.InspectedBytes)
Expand Down Expand Up @@ -278,30 +285,27 @@ func (s *queryRangeSharder) blockMetas(start, end int64, tenantID string) []*bac
return metas
}

func (s *queryRangeSharder) backendRequests(tenantID string, searchReq *tempopb.QueryRangeRequest, now time.Time, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *queryRangeJob, stopCh <-chan struct{}) (totalBlocks, totalBlockBytes int) {
func (s *queryRangeSharder) backendRequests(tenantID string, searchReq tempopb.QueryRangeRequest, now time.Time, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *queryRangeJob, stopCh <-chan struct{}) (totalBlocks, totalBlockBytes int) {
// request without start or end, search only in generator
if searchReq.Start == 0 || searchReq.End == 0 {
close(reqCh)
return
}

// calculate duration (start and end) to search the backend blocks
start, end := s.backendRange(now, searchReq.Start, searchReq.End, s.cfg.QueryBackendAfter)

fmt.Println("Backend request range:",
"reqStart", time.Unix(0, int64(searchReq.Start)), "reqEnd", time.Unix(0, int64(searchReq.End)),
"start", time.Unix(0, int64(start)), "end", time.Unix(0, int64(end)),
)
// Make a copy and limit to backend time range.
backendReq := searchReq
backendReq.Start, backendReq.End = s.backendRange(now, backendReq.Start, backendReq.End, s.cfg.QueryBackendAfter)
alignTimeRange(&backendReq)

// no need to search backend
if start == end {
// If empty window then no need to search backend
if backendReq.Start == backendReq.End {
close(reqCh)
return
}

// Blocks within overall time range. This is just for instrumentation, more precise time
// range is checked for each window.
blocks := s.blockMetas(int64(start), int64(end), tenantID)
blocks := s.blockMetas(int64(backendReq.Start), int64(backendReq.End), tenantID)
if len(blocks) == 0 {
// no need to search backend
close(reqCh)
Expand All @@ -314,16 +318,20 @@ func (s *queryRangeSharder) backendRequests(tenantID string, searchReq *tempopb.
}

go func() {
s.buildBackendRequests(tenantID, searchReq, start, end, samplingRate, targetBytesPerRequest, interval, reqCh, stopCh)
s.buildBackendRequests(tenantID, backendReq, samplingRate, targetBytesPerRequest, interval, reqCh, stopCh)
}()

return
}

func (s *queryRangeSharder) buildBackendRequests(tenantID string, searchReq *tempopb.QueryRangeRequest, start, end uint64, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *queryRangeJob, stopCh <-chan struct{}) {
func (s *queryRangeSharder) buildBackendRequests(tenantID string, searchReq tempopb.QueryRangeRequest, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *queryRangeJob, stopCh <-chan struct{}) {
defer close(reqCh)

timeWindowSize := uint64(interval.Nanoseconds())
var (
start = searchReq.Start
end = searchReq.End
timeWindowSize = uint64(interval.Nanoseconds())
)

for start < end {

Expand All @@ -347,7 +355,7 @@ func (s *queryRangeSharder) buildBackendRequests(tenantID string, searchReq *tem
shards := uint32(math.Ceil(float64(totalBlockSize) / float64(targetBytesPerRequest)))

for i := uint32(1); i <= shards; i++ {
shardR := *searchReq
shardR := searchReq
shardR.Start = thisStart
shardR.End = thisEnd
shardR.ShardID = i
Expand Down Expand Up @@ -387,8 +395,7 @@ func (s *queryRangeSharder) backendRange(now time.Time, start, end uint64, query
return start, end
}

func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest, samplingRate float64) *queryRangeJob {
now := time.Now()
func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest, now time.Time, samplingRate float64) *queryRangeJob {
cutoff := uint64(now.Add(-s.cfg.QueryBackendAfter).UnixNano())

// if there's no overlap between the query and ingester range just return nil
Expand All @@ -400,7 +407,9 @@ func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest
searchReq.Start = cutoff
}

// if ingester start == ingester end then we don't need to query it
alignTimeRange(&searchReq)

// if start == end then we don't need to query it
if searchReq.Start == searchReq.End {
return nil
}
Expand Down Expand Up @@ -431,6 +440,16 @@ func (s *queryRangeSharder) toUpstreamRequest(ctx context.Context, req tempopb.Q
return subR
}

// alignTimeRange shifts the start and end times of the request to align with the step
// interval. This gives more consistent results across refreshes of queries like "last 1 hour".
// Without alignment each refresh is shifted by seconds or even milliseconds and the time series
// calculations are sublty different each time. It's not wrong, but less preferred behavior.
func alignTimeRange(req *tempopb.QueryRangeRequest) {
// It doesn't really matter but the request fields are expected to be in nanoseconds.
req.Start = req.Start / req.Step * req.Step
req.End = req.End / req.Step * req.Step
}

// maxDuration returns the max search duration allowed for this tenant.
func (s *queryRangeSharder) maxDuration(tenantID string) time.Duration {
// check overrides first, if no overrides then grab from our config
Expand Down Expand Up @@ -488,13 +507,6 @@ func (s *queryRangeSharder) jobInterval(expr *traceql.RootExpr, allowUnsafe bool
}

func (s *queryRangeSharder) convertToPromFormat(resp *tempopb.QueryRangeResponse) PromResponse {
// Sort in increasing timestamp so that lines are drawn correctly
for _, series := range resp.Series {
sort.Slice(series.Samples, func(i, j int) bool {
return series.Samples[i].TimestampMs < series.Samples[j].TimestampMs
})
}

promResp := PromResponse{
Status: "success",
Data: &PromData{ResultType: "matrix"},
Expand Down

0 comments on commit ae9f03b

Please sign in to comment.