Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add span deduper to support zipkin client/server kind spans #687

Merged
merged 7 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [ENHANCEMENT] Add config option to disable write extension to the ingesters. [#677](https://github.com/grafana/tempo/pull/677)
* [ENHANCEMENT] Preallocate byte slices on ingester request unmarshal. [#679](https://github.com/grafana/tempo/pull/679)
* [ENHANCEMENT] Zipkin Support - CombineTraces. [#688](https://github.com/grafana/tempo/pull/688)
* [ENHANCEMENT] Zipkin support - Dedupe span IDs based on span.Kind (client/server) in Query Frontend. [#687](https://github.com/grafana/tempo/pull/687)

## v0.7.0

Expand Down
1 change: 1 addition & 0 deletions example/docker-compose/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
ports:
- "14268" # jaeger ingest
- "3100" # tempo
- "9411" # zipkin

synthetic-load-generator:
image: omnition/synthetic-load-generator:1.0.25
Expand Down
179 changes: 179 additions & 0 deletions modules/frontend/deduper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package frontend

import (
"bytes"
"encoding/binary"
"fmt"
"io/ioutil"
"net/http"

"github.com/go-kit/kit/log"
"github.com/golang/protobuf/proto"
"github.com/opentracing/opentracing-go"

"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
)

const (
warningTooManySpans = "cannot assign unique span ID, too many spans in the trace"
)

var (
maxSpanID uint64 = 0xffffffffffffffff
)

func Deduper(logger log.Logger) Middleware {
return MiddlewareFunc(func(next Handler) Handler {
return spanIDDeduper{
next: next,
logger: logger,
}
})
}

// This is copied over from Jaeger and modified to work for OpenTelemetry Trace data structure
// https://github.com/jaegertracing/jaeger/blob/12bba8c9b91cf4a29d314934bc08f4a80e43c042/model/adjuster/span_id_deduper.go
type spanIDDeduper struct {
next Handler
logger log.Logger
trace *tempopb.Trace
spansByID map[uint64][]*v1.Span
maxUsedID uint64
}

// Do implements Handler
func (s spanIDDeduper) Do(req *http.Request) (*http.Response, error) {
ctx := req.Context()
span, _ := opentracing.StartSpanFromContext(ctx, "frontend.DedupeSpanIDs")
defer span.Finish()

resp, err := s.next.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode == http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
return nil, err
}

traceObject := &tempopb.Trace{}
err = proto.Unmarshal(body, traceObject)
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

s.trace = traceObject
s.dedupe()

traceBytes, err := proto.Marshal(s.trace)
if err != nil {
return nil, err
}

return &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(traceBytes)),
Header: http.Header{},
}, nil
}

return resp, nil
}

func (s *spanIDDeduper) dedupe() {
s.groupSpansByID()
s.dedupeSpanIDs()
}

// groupSpansByID groups spans with the same ID returning a map id -> []Span
func (s *spanIDDeduper) groupSpansByID() {
spansByID := make(map[uint64][]*v1.Span)
for _, batch := range s.trace.Batches {
for _, ils := range batch.InstrumentationLibrarySpans {
for _, span := range ils.Spans {
id := binary.BigEndian.Uint64(span.SpanId)
if spans, ok := spansByID[id]; ok {
// TODO maybe return an error if more than 2 spans found
spansByID[id] = append(spans, span)
} else {
spansByID[id] = []*v1.Span{span}
}
}
}
}
s.spansByID = spansByID
}

func (s *spanIDDeduper) isSharedWithClientSpan(spanID uint64) bool {
for _, span := range s.spansByID[spanID] {
if span.GetKind() == v1.Span_SPAN_KIND_CLIENT {
return true
}
}
return false
}

func (s *spanIDDeduper) dedupeSpanIDs() {
oldToNewSpanIDs := make(map[uint64]uint64)
for _, batch := range s.trace.Batches {
for _, ils := range batch.InstrumentationLibrarySpans {
for _, span := range ils.Spans {
id := binary.BigEndian.Uint64(span.SpanId)
// only replace span IDs for server-side spans that share the ID with something else
if span.GetKind() == v1.Span_SPAN_KIND_SERVER && s.isSharedWithClientSpan(id) {
newID, err := s.makeUniqueSpanID()
if err != nil {
// ignore this error condition where we have more than 2^64 unique span IDs
continue
}
oldToNewSpanIDs[id] = newID
if len(span.ParentSpanId) == 0 {
span.ParentSpanId = make([]byte, 8)
}
binary.BigEndian.PutUint64(span.ParentSpanId, id) // previously shared ID is the new parent
binary.BigEndian.PutUint64(span.SpanId, newID)
}
}
}
}
s.swapParentIDs(oldToNewSpanIDs)
}

// swapParentIDs corrects ParentSpanID of all spans that are children of the server
// spans whose IDs we deduped.
func (s *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[uint64]uint64) {
if len(oldToNewSpanIDs) == 0 {
return
}
for _, batch := range s.trace.Batches {
for _, ils := range batch.InstrumentationLibrarySpans {
for _, span := range ils.Spans {
if len(span.GetParentSpanId()) > 0 {
parentSpanID := binary.BigEndian.Uint64(span.GetParentSpanId())
if parentID, ok := oldToNewSpanIDs[parentSpanID]; ok {
if binary.BigEndian.Uint64(span.SpanId) != parentID {
binary.BigEndian.PutUint64(span.SpanId, parentID)
}
}
}
}
}
}
}

// makeUniqueSpanID returns a new ID that is not used in the trace,
// or an error if such ID cannot be generated, which is unlikely,
// given that the whole space of span IDs is 2^64.
func (s *spanIDDeduper) makeUniqueSpanID() (uint64, error) {
for id := s.maxUsedID + 1; id < maxSpanID; id++ {
if _, ok := s.spansByID[id]; !ok {
s.maxUsedID = id
return id, nil
}
}
return 0, fmt.Errorf(warningTooManySpans)
}
155 changes: 155 additions & 0 deletions modules/frontend/deduper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package frontend

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/util/test"
)

func TestDedupeSpanIDs(t *testing.T) {
tests := []struct {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
name string
trace *tempopb.Trace
expectedRes *tempopb.Trace
}{
{
name: "no duplicates",
trace: &tempopb.Trace{
Batches: []*v1.ResourceSpans{
{
InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
{
Spans: []*v1.Span{
{
SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
Kind: v1.Span_SPAN_KIND_CLIENT,
},
{
SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02},
},
{
SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03},
Kind: v1.Span_SPAN_KIND_SERVER,
},
},
},
},
},
},
},
expectedRes: &tempopb.Trace{
Batches: []*v1.ResourceSpans{
{
InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{
{
Spans: []*v1.Span{
{
SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
Kind: v1.Span_SPAN_KIND_CLIENT,
},
{
SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02},
},
{
SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03},
Kind: v1.Span_SPAN_KIND_SERVER,
},
},
},
},
},
},
},
},
{
name: "duplicate span id",
trace: &tempopb.Trace{
Batches: []*v1.ResourceSpans{
{
InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{
{
Spans: []*v1.Span{
{
SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
Kind: v1.Span_SPAN_KIND_CLIENT,
},
{
SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02},
},
{
SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
Kind: v1.Span_SPAN_KIND_SERVER,
},
},
},
},
},
},
},
expectedRes: &tempopb.Trace{
Batches: []*v1.ResourceSpans{
{
InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{
{
Spans: []*v1.Span{
{
SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
Kind: v1.Span_SPAN_KIND_CLIENT,
},
{
SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02},
},
{
SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03},
Kind: v1.Span_SPAN_KIND_SERVER,
ParentSpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
},
},
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &spanIDDeduper{
trace: tt.trace,
}
s.dedupe()
assert.Equal(t, tt.expectedRes, s.trace)
})
}

}

func BenchmarkDeduper100(b *testing.B) {
benchmarkDeduper(b, 100)
}

func BenchmarkDeduper1000(b *testing.B) {
benchmarkDeduper(b, 1000)
}
func BenchmarkDeduper10000(b *testing.B) {
benchmarkDeduper(b, 10000)
}

func BenchmarkDeduper100000(b *testing.B) {
benchmarkDeduper(b, 100000)
}

func benchmarkDeduper(b *testing.B, traceSpanCount int) {
s := &spanIDDeduper{
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
trace: test.MakeTraceWithSpanCount(1, traceSpanCount, []byte{0x00}),
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
s.dedupe()
}
}
Loading