Skip to content

Commit

Permalink
Block partial results (#1007)
Browse files Browse the repository at this point in the history
* Return Stats from running jobs in pool

* Return num of failed blocks from tempodb

* Add metrics to TraceByID

* Allow for returning partial results in querier

* Handle partial results in querysharding

* Changelog

* Some improvements + make max configurable

* Better handling in frontend middlewares

* Fix querysharing test

* Measure failed blocks

* Default max failed blocks to 0

* Atomic not necessary

* Rename config param

* Remove uncessary nil check & allocation

* Pass slice of errors to querier

* Remove failed quieries metric

* Please linter

* Document breaking change

* Review suggestions
  • Loading branch information
mapno committed Oct 8, 2021
1 parent d35b874 commit 9cf0e7e
Show file tree
Hide file tree
Showing 19 changed files with 494 additions and 175 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
* [ENHANCEMENT] Jsonnet: add `$._config.memcached.memory_limit_mb` [#987](https://github.com/grafana/tempo/pull/987) (@kvrhdn)
* [ENHANCEMENT] Upgrade jsonnet-libs to 1.19 and update tk examples [#1001](https://github.com/grafana/tempo/pull/1001) (@mapno)
* [ENHANCEMENT] Shard tenant index creation by tenant and add functionality to handle stale indexes. [#1005](https://github.com/grafana/tempo/pull/1005) (@joe-elliott)
* [ENHANCEMENT] **BREAKING CHANGE** Support partial results from failed block queries [#1007](https://github.com/grafana/tempo/pull/1007) (@mapno)
Querier [`GET /querier/api/traces/<traceid>`](https://grafana.com/docs/tempo/latest/api_docs/#query) response's body has been modified
to return `tempopb.TraceByIDResponse` instead of simply `tempopb.Trace`. This will cause a disruption of the read path during rollout of the change.
* [BUGFIX] Update port spec for GCS docker-compose example [#869](https://github.com/grafana/tempo/pull/869) (@zalegrala)
* [BUGFIX] Fix "magic number" errors and other block mishandling when an ingester forcefully shuts down [#937](https://github.com/grafana/tempo/issues/937) (@mdisibio)
* [BUGFIX] Fix compactor memory leak [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio)
Expand Down
6 changes: 6 additions & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ query_frontend:
# number of shards to split the query into
# (default: 20)
[query_shards: <int>]
# number of block queries that are tolerated to error before considering the entire query as failed
# numbers greater than 0 make possible for a read to return partial results
# partial results are indicated with HTTP status code 206
# (default: 0)
[tolerate_failed_blocks: <int>]
```

## Querier
Expand Down
16 changes: 13 additions & 3 deletions docs/tempo/website/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ go run ./cmd/tempo --storage.trace.backend=local --storage.trace.local.path=/tmp

## Complete Configuration

> **Note**: This manifest was generated on 2021-08-25.
> **Note**: This manifest was generated on 2021-10-08.
```yaml
target: all
http_api_prefix: ""
server:
http_listen_network: tcp
http_listen_address: ""
http_listen_port: 80
http_listen_conn_limit: 0
grpc_listen_network: tcp
grpc_listen_address: ""
grpc_listen_port: 9095
grpc_listen_conn_limit: 0
Expand Down Expand Up @@ -100,6 +102,7 @@ distributor:
override_ring_key: distributor
log_received_traces: false
extend_writes: true
search_tags_deny_list: []
ingester_client:
pool_config:
checkinterval: 15s
Expand All @@ -125,6 +128,7 @@ ingester_client:
tls_insecure_skip_verify: false
querier:
query_timeout: 10s
search_query_timeout: 30s
max_concurrent_queries: 5
frontend_worker:
frontend_address: 127.0.0.1:9095
Expand Down Expand Up @@ -305,11 +309,13 @@ storage:
bloom_filter_false_positive: 0.01
bloom_filter_shard_size_bytes: 102400
encoding: zstd
search_encoding: gzip
search_page_size_bytes: 1048576
blocklist_poll: 5m0s
blocklist_poll_concurrency: 50
blocklist_poll_fallback: true
blocklist_poll_tenant_index_builders: 2
blocklist_poll_stale_tenant_index: 0
blocklist_poll_stale_tenant_index: 0s
backend: local
local:
path: /tmp/tempo/traces
Expand Down Expand Up @@ -350,9 +356,11 @@ overrides:
ingestion_rate_strategy: local
ingestion_rate_limit_bytes: 15000000
ingestion_burst_size_bytes: 20000000
search_tags_allow_list: null
max_traces_per_user: 10000
max_global_traces_per_user: 0
max_bytes_per_trace: 5000000
max_bytes_per_trace: 50000
max_search_bytes_per_trace: 0
block_retention: 0s
per_tenant_override_config: ""
per_tenant_override_period: 10s
Expand All @@ -367,6 +375,8 @@ memberlist:
gossip_to_dead_nodes_time: 30s
dead_node_reclaim_time: 0s
compression_enabled: false
advertise_addr: ""
advertise_port: 7946
join_members: []
min_join_backoff: 1s
max_join_backoff: 1m0s
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ require (
go.opentelemetry.io/otel/trace v1.0.0-RC2
go.uber.org/atomic v1.9.0
go.uber.org/goleak v1.1.10
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.17.0
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6
Expand Down Expand Up @@ -239,7 +240,6 @@ require (
go.mongodb.org/mongo-driver v1.5.1 // indirect
go.opentelemetry.io/otel/metric v0.21.0 // indirect
go.opentelemetry.io/otel/sdk/export/metric v0.21.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.4.2 // indirect
Expand Down
8 changes: 5 additions & 3 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
)

type Config struct {
Config frontend.CombinedFrontendConfig `yaml:",inline"`
MaxRetries int `yaml:"max_retries,omitempty"`
QueryShards int `yaml:"query_shards,omitempty"`
Config frontend.CombinedFrontendConfig `yaml:",inline"`
MaxRetries int `yaml:"max_retries,omitempty"`
QueryShards int `yaml:"query_shards,omitempty"`
TolerateFailedBlocks int `yaml:"tolerate_failed_blocks,omitempty"`
}

func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
Expand All @@ -19,6 +20,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Config.FrontendV1.MaxOutstandingPerTenant = 100
cfg.MaxRetries = 2
cfg.QueryShards = 20
cfg.TolerateFailedBlocks = 0
}

type CortexNoQuerierLimits struct{}
Expand Down
11 changes: 6 additions & 5 deletions modules/frontend/deduper.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,24 @@ func (s spanIDDeduper) RoundTrip(req *http.Request) (*http.Response, error) {
return nil, err
}

traceObject := &tempopb.Trace{}
err = proto.Unmarshal(body, traceObject)
responseObject := &tempopb.TraceByIDResponse{}
err = proto.Unmarshal(body, responseObject)
if err != nil {
return nil, err
}

s.trace = traceObject
s.trace = responseObject.Trace
s.dedupe()

traceBytes, err := proto.Marshal(s.trace)
responseObject.Trace = s.trace
responseBytes, err := proto.Marshal(responseObject)
if err != nil {
return nil, err
}

return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader(traceBytes)),
Body: io.NopCloser(bytes.NewReader(responseBytes)),
Header: http.Header{},
ContentLength: resp.ContentLength,
}, nil
Expand Down
12 changes: 8 additions & 4 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func NewTracesMiddleware(cfg Config, logger log.Logger, registerer prometheus.Re
// - the Deduper dedupes Span IDs for Zipkin support
// - the ShardingWare shards queries by splitting the block ID space
// - the RetryWare retries requests that have failed (error or http status 500)
rt := NewRoundTripper(next, Deduper(logger), ShardingWare(cfg.QueryShards, logger), RetryWare(cfg.MaxRetries, registerer))
rt := NewRoundTripper(next, Deduper(logger), ShardingWare(cfg.QueryShards, cfg.TolerateFailedBlocks, logger), RetryWare(cfg.MaxRetries, registerer))

return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
// validate traceID
Expand Down Expand Up @@ -156,15 +156,19 @@ func NewTracesMiddleware(cfg Config, logger log.Logger, registerer prometheus.Re
if err != nil {
return nil, errors.Wrap(err, "error reading response body at query frontend")
}
traceObject := &tempopb.Trace{}
err = proto.Unmarshal(body, traceObject)
responseObject := &tempopb.TraceByIDResponse{}
err = proto.Unmarshal(body, responseObject)
if err != nil {
return nil, err
}

if responseObject.Metrics.FailedBlocks > 0 {
resp.StatusCode = http.StatusPartialContent
}

var jsonTrace bytes.Buffer
marshaller := &jsonpb.Marshaler{}
err = marshaller.Marshal(&jsonTrace, traceObject)
err = marshaller.Marshal(&jsonTrace, responseObject.Trace)
if err != nil {
return nil, err
}
Expand Down
35 changes: 26 additions & 9 deletions modules/frontend/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"net/http"
"strings"
Expand All @@ -13,12 +14,11 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/log/level"
"github.com/golang/protobuf/proto"
"github.com/opentracing/opentracing-go"
"github.com/weaveworks/common/user"

"github.com/grafana/tempo/modules/querier"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/opentracing/opentracing-go"
"github.com/weaveworks/common/user"
)

const (
Expand All @@ -29,13 +29,14 @@ const (
queryDelimiter = "?"
)

func ShardingWare(queryShards int, logger log.Logger) Middleware {
func ShardingWare(queryShards, maxFailedBlocks int, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
return shardQuery{
next: next,
queryShards: queryShards,
logger: logger,
blockBoundaries: createBlockBoundaries(queryShards - 1), // one shard will be used to query ingesters
maxFailedBlocks: uint32(maxFailedBlocks),
}
})
}
Expand All @@ -45,6 +46,7 @@ type shardQuery struct {
queryShards int
logger log.Logger
blockBoundaries [][]byte
maxFailedBlocks uint32
}

// RoundTrip implements http.RoundTripper
Expand All @@ -64,8 +66,9 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
wg := sync.WaitGroup{}
mtx := sync.Mutex{}

var overallTrace *tempopb.Trace
var overallError error
var totalFailedBlocks uint32
overallTrace := &tempopb.Trace{}
statusCode := http.StatusNotFound
statusMsg := "trace not found"

Expand Down Expand Up @@ -121,17 +124,25 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
// marshal into a trace to combine.
// todo: better define responsibilities between middleware. the parent middleware in frontend.go actually sets the header
// which forces the body here to be a proto encoded tempopb.Trace{}
trace := &tempopb.Trace{}
err = proto.Unmarshal(buff, trace)
traceResp := &tempopb.TraceByIDResponse{}
err = proto.Unmarshal(buff, traceResp)
if err != nil {
_ = level.Error(s.logger).Log("msg", "error unmarshalling response", "url", innerR.RequestURI, "err", err, "body", string(buff))
overallError = err
return
}

if traceResp.Metrics != nil {
totalFailedBlocks += traceResp.Metrics.FailedBlocks
if totalFailedBlocks > s.maxFailedBlocks {
overallError = fmt.Errorf("too many failed block queries %d (max %d)", totalFailedBlocks, s.maxFailedBlocks)
return
}
}

// happy path
statusCode = http.StatusOK
overallTrace, _, _, _ = model.CombineTraceProtos(overallTrace, trace)
overallTrace, _, _, _ = model.CombineTraceProtos(overallTrace, traceResp.Trace)
}(req)
}
wg.Wait()
Expand All @@ -155,7 +166,12 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
}, nil
}

buff, err := proto.Marshal(overallTrace)
buff, err := proto.Marshal(&tempopb.TraceByIDResponse{
Trace: overallTrace,
Metrics: &tempopb.TraceByIDMetrics{
FailedBlocks: totalFailedBlocks,
},
})
if err != nil {
_ = level.Error(s.logger).Log("msg", "error marshalling response to proto", "err", err)
return nil, err
Expand Down Expand Up @@ -238,5 +254,6 @@ func shouldQuit(ctx context.Context, statusCode int, err error) bool {
if statusCode/100 == 5 { // bail on any 5xx's
return true
}

return false
}
Loading

0 comments on commit 9cf0e7e

Please sign in to comment.