Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expansion of Encoding Interfaces and Addition of V2 #1227

Merged
merged 17 commits into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
## main / unreleased

* [FEATURE]: v2 object encoding added. This encoding adds a start/end timestamp to every record to reduce proto marshalling and increase search speed.
**BREAKING CHANGE** After this rollout the distributors will use a new API on the ingesters. As such you must rollout all ingesters before rolling the
distributors. Also, during this period, the ingesters will use considerably more resources and as such should be scaled up (or incoming traffic should be heavily
throttled). Once all distributors and ingesters have rolled performance will return to normal. [#1227](https://github.com/grafana/tempo/pull/1227) (@joe-elliott)
* [ENHACEMENT] Enterprise jsonnet: add config to create tokengen job explicitly [#1256](https://github.com/grafana/tempo/pull/1256) (@kvrhdn)
* [BUGFIX]: Remove unnecessary PersistentVolumeClaim [#1245](https://github.com/grafana/tempo/issues/1245)
* [BUGFIX] Fixed issue when query-frontend doesn't log request details when request is cancelled [#1136](https://github.com/grafana/tempo/issues/1136) (@adityapwr)


## v1.3.0 / 2022-01-24
* [FEATURE]: Add support for [inline environments](https://tanka.dev/inline-environments). [#1184](https://github.com/grafana/tempo/pull/1184) @irizzant
* [CHANGE] Search: Add new per-tenant limit `max_bytes_per_tag_values_query` to limit the size of tag-values response. [#1068](https://github.com/grafana/tempo/pull/1068) (@annanay25)
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-cli/cmd-list-block.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func dumpBlock(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID s

copy(prevID, objID)

trace, err := model.MustNewDecoder(meta.DataEncoding).PrepareForRead(obj)
trace, err := model.MustNewObjectDecoder(meta.DataEncoding).PrepareForRead(obj)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-cli/cmd-query-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func queryBlock(ctx context.Context, r backend.Reader, c backend.Compactor, bloc
return nil, nil
}

trace, err := model.MustNewDecoder(meta.DataEncoding).PrepareForRead(obj)
trace, err := model.MustNewObjectDecoder(meta.DataEncoding).PrepareForRead(obj)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-cli/cmd-search.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func searchIterator(iter encoding.Iterator, dataEncoding string, name string, va
}

// todo : parrallelize unmarshal and search
trace, err := model.MustNewDecoder(dataEncoding).PrepareForRead(obj)
trace, err := model.MustNewObjectDecoder(dataEncoding).PrepareForRead(obj)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-serverless/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func Handler(w http.ResponseWriter, r *http.Request) {
Metrics: &tempopb.SearchMetrics{},
}

decoder, err := model.NewDecoder(searchReq.DataEncoding)
decoder, err := model.NewObjectDecoder(searchReq.DataEncoding)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down
81 changes: 49 additions & 32 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
"math"
"strings"
"time"

Expand All @@ -28,6 +29,7 @@ import (
ingester_client "github.com/grafana/tempo/modules/ingester/client"
"github.com/grafana/tempo/modules/overrides"
_ "github.com/grafana/tempo/pkg/gogocodec" // force gogo codec registration
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/util"
Expand Down Expand Up @@ -87,6 +89,14 @@ var (
}, []string{discardReasonLabel, "tenant"})
)

// rebatchedTrace is used to more cleanly pass the set of data
type rebatchedTrace struct {
id []byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is id used? I see trace.id used, but maybe missed where id is used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, it's used on line 333 where the distributor creates the batches it sends to the ingesters.

trace *tempopb.Trace
start uint32 // unix epoch seconds
end uint32 // unix epoch seconds
}

// Distributor coordinates replicates and distribution of log streams.
type Distributor struct {
services.Service
Expand All @@ -97,6 +107,7 @@ type Distributor struct {
pool *ring_client.Pool
DistributorRing *ring.Ring
overrides *overrides.Overrides
traceEncoder model.SegmentDecoder

// search
searchEnabled bool
Expand Down Expand Up @@ -169,6 +180,7 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi
searchEnabled: searchEnabled,
globalTagsToDrop: tagsToDrop,
overrides: o,
traceEncoder: model.MustNewSegmentDecoder(model.CurrentEncoding),
}

cfgReceivers := cfg.Receivers
Expand Down Expand Up @@ -258,7 +270,7 @@ func (d *Distributor) PushBatches(ctx context.Context, batches []*v1.ResourceSpa
size)
}

keys, traces, ids, err := requestsByTraceID(batches, userID, spanCount)
keys, rebatchedTraces, err := requestsByTraceID(batches, userID, spanCount)
if err != nil {
metricDiscardedSpans.WithLabelValues(reasonInternalError, userID).Add(float64(spanCount))
return nil, err
Expand All @@ -267,7 +279,7 @@ func (d *Distributor) PushBatches(ctx context.Context, batches []*v1.ResourceSpa
var searchData [][]byte
if d.searchEnabled {
perTenantAllowedTags := d.overrides.SearchTagsAllowList(userID)
searchData = extractSearchDataAll(traces, ids, func(tag string) bool {
searchData = extractSearchDataAll(rebatchedTraces, func(tag string) bool {
// if in per tenant override, extract
if _, ok := perTenantAllowedTags[tag]; ok {
return true
Expand All @@ -281,19 +293,19 @@ func (d *Distributor) PushBatches(ctx context.Context, batches []*v1.ResourceSpa
})
}

err = d.sendToIngestersViaBytes(ctx, userID, traces, searchData, keys, ids)
err = d.sendToIngestersViaBytes(ctx, userID, rebatchedTraces, searchData, keys)
if err != nil {
recordDiscaredSpans(err, userID, spanCount)
}

return nil, err // PushRequest is ignored, so no reason to create one
}

func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string, traces []*tempopb.Trace, searchData [][]byte, keys []uint32, ids [][]byte) error {
func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string, traces []*rebatchedTrace, searchData [][]byte, keys []uint32) error {
// Marshal to bytes once
marshalledTraces := make([][]byte, len(traces))
for i, t := range traces {
b, err := t.Marshal()
b, err := d.traceEncoder.PrepareForWrite(t.trace, t.start, t.end)
if err != nil {
return errors.Wrap(err, "failed to marshal PushRequest")
}
Expand All @@ -318,7 +330,7 @@ func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string

for i, j := range indexes {
req.Traces[i].Slice = marshalledTraces[j][0:]
req.Ids[i].Slice = ids[j]
req.Ids[i].Slice = traces[j].id

// Search data optional
if len(searchData) > j {
Expand All @@ -331,7 +343,7 @@ func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string
return err
}

_, err = c.(tempopb.PusherClient).PushBytes(localCtx, &req)
_, err = c.(tempopb.PusherClient).PushBytesV2(localCtx, &req)
metricIngesterAppends.WithLabelValues(ingester.Addr).Inc()
if err != nil {
metricIngesterAppendFailures.WithLabelValues(ingester.Addr).Inc()
Expand All @@ -349,14 +361,9 @@ func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckReques

// requestsByTraceID takes an incoming tempodb.PushRequest and creates a set of keys for the hash ring
// and traces to pass onto the ingesters.
func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int) ([]uint32, []*tempopb.Trace, [][]byte, error) {
type traceAndID struct {
id []byte
trace *tempopb.Trace
}

func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int) ([]uint32, []*rebatchedTrace, error) {
const tracesPerBatch = 20 // p50 of internal env
tracesByID := make(map[uint32]*traceAndID, tracesPerBatch)
tracesByID := make(map[uint32]*rebatchedTrace, tracesPerBatch)

for _, b := range batches {
spansByILS := make(map[uint32]*v1.InstrumentationLibrarySpans)
Expand All @@ -365,7 +372,7 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int
for _, span := range ils.Spans {
traceID := span.TraceId
if !validation.ValidTraceID(traceID) {
return nil, nil, nil, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit")
return nil, nil, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit")
}

traceKey := util.TokenFor(userID, traceID)
Expand All @@ -375,8 +382,8 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int
ilsKey = fnv1a.AddString32(ilsKey, ils.InstrumentationLibrary.Version)
}

existingILS, ok := spansByILS[ilsKey]
if !ok {
existingILS, ilsAdded := spansByILS[ilsKey]
if !ilsAdded {
existingILS = &v1.InstrumentationLibrarySpans{
InstrumentationLibrary: ils.InstrumentationLibrary,
Spans: make([]*v1.Span, 0, spanCount/tracesPerBatch),
Expand All @@ -385,44 +392,49 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int
}
existingILS.Spans = append(existingILS.Spans, span)

// if we found an ILS we assume its already part of a request and can go to the next span
if ok {
continue
}

// now find and update the rebatchedTrace with a new start and end
existingTrace, ok := tracesByID[traceKey]
if !ok {
existingTrace = &traceAndID{
existingTrace = &rebatchedTrace{
id: traceID,
trace: &tempopb.Trace{
Batches: make([]*v1.ResourceSpans, 0, spanCount/tracesPerBatch),
},
start: math.MaxUint32,
end: 0,
}

tracesByID[traceKey] = existingTrace
}

existingTrace.trace.Batches = append(existingTrace.trace.Batches, &v1.ResourceSpans{
Resource: b.Resource,
InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{existingILS},
})
start, end := startEndFromSpan(span)
if existingTrace.end < end {
existingTrace.end = end
}
if existingTrace.start > start {
existingTrace.start = start
}
if !ilsAdded {
existingTrace.trace.Batches = append(existingTrace.trace.Batches, &v1.ResourceSpans{
Resource: b.Resource,
InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{existingILS},
})
}
}
}
}

metricTracesPerBatch.Observe(float64(len(tracesByID)))

keys := make([]uint32, 0, len(tracesByID))
traces := make([]*tempopb.Trace, 0, len(tracesByID))
ids := make([][]byte, 0, len(tracesByID))
traces := make([]*rebatchedTrace, 0, len(tracesByID))

for k, r := range tracesByID {
keys = append(keys, k)
traces = append(traces, r.trace)
ids = append(ids, r.id)
traces = append(traces, r)
}

return keys, traces, ids, nil
return keys, traces, nil
}

func recordDiscaredSpans(err error, userID string, spanCount int) {
Expand Down Expand Up @@ -450,3 +462,8 @@ func logTraces(batches []*v1.ResourceSpans) {
}
}
}

// startEndFromSpan returns a unix epoch timestamp in seconds for the start and end of a span
func startEndFromSpan(span *v1.Span) (uint32, uint32) {
return uint32(span.StartTimeUnixNano / uint64(time.Second)), uint32(span.EndTimeUnixNano / uint64(time.Second))
}
Loading