Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert Zipkin receiver and exporter to use OTLP and fix translation bugs #1446

Merged
merged 37 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4fd5004
fixed all Jaeger translation diffs from correctness tests
kbrockhoff Jun 27, 2020
9d8e891
fixed Jaeger translations where Resource is missing or empty
kbrockhoff Jun 27, 2020
23e5237
improve test coverage
kbrockhoff Jun 29, 2020
36d90b5
Merge branch 'master' into fix-jaeger-translations
kbrockhoff Jul 3, 2020
fd565c9
Merge branch 'master' into fix-zipkin-translations
kbrockhoff Jul 10, 2020
3d39683
initial dev of otlp to zipkin translation
kbrockhoff Jul 10, 2020
cab9be8
initial dev of otlp to zipkin translation
kbrockhoff Jul 11, 2020
76451ec
initial dev of zipkin v2 to otlp translation
kbrockhoff Jul 15, 2020
140bb5f
initial dev of zipkin v2 to otlp translation
kbrockhoff Jul 18, 2020
149ba26
Merge branch 'master' into fix-zipkin-translations
kbrockhoff Jul 22, 2020
6739625
initial dev of zipkin v2 to otlp translation
kbrockhoff Jul 22, 2020
b08ee12
initial dev of zipkin v2 to otlp translation
kbrockhoff Jul 25, 2020
bde1771
initial dev of zipkin v1 to otlp translation
kbrockhoff Jul 25, 2020
bd76ad3
change zipkin receiver and exporter to use internal traces
kbrockhoff Jul 25, 2020
85c3391
improve event/annotation translations
kbrockhoff Jul 25, 2020
e59740e
fix broken zipkin tests
kbrockhoff Jul 27, 2020
c43074f
Merge branch 'master' into fix-zipkin-translations
kbrockhoff Jul 28, 2020
65bffa2
convert zipken receiver/exporter to new factory signature
kbrockhoff Jul 28, 2020
1fb224b
fixed receiver config
kbrockhoff Jul 28, 2020
3101bc7
Merge branch 'master' into fix-zipkin-translations
kbrockhoff Aug 1, 2020
f3b7b75
add code from master
kbrockhoff Aug 1, 2020
8c536aa
fix broken test
kbrockhoff Aug 1, 2020
95168a1
fix PR requested changes
kbrockhoff Aug 1, 2020
c228643
translation fixes
kbrockhoff Aug 1, 2020
972ddac
improve perf
kbrockhoff Aug 3, 2020
2f19c3a
added more test coverage
kbrockhoff Aug 3, 2020
98d4a8b
changed timestamp validations to only check to nearest millisecond
kbrockhoff Aug 7, 2020
2cc7420
Merge branch 'master' into fix-zipkin-translations
kbrockhoff Aug 7, 2020
86cfabd
fix new lint issues and PR comments
kbrockhoff Aug 8, 2020
8b98e7b
improve timestamp validation
kbrockhoff Aug 8, 2020
f989072
improve timestamp validation
kbrockhoff Aug 8, 2020
d6a7585
fix missed timestamp validation
kbrockhoff Aug 8, 2020
daa24ae
added missing goldendata examples
kbrockhoff Aug 8, 2020
f749072
fix generator tests
kbrockhoff Aug 8, 2020
acdce2b
Merge branch 'master' into fix-zipkin-translations
kbrockhoff Aug 11, 2020
325ffbe
fix PR comments
kbrockhoff Aug 11, 2020
50616c8
improve test coverage
kbrockhoff Aug 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions exporter/zipkinexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (f *Factory) CreateDefaultConfig() configmodels.Exporter {
}

// CreateTraceExporter creates a trace exporter based on this config.
func (f *Factory) CreateTraceExporter(_ *zap.Logger, config configmodels.Exporter) (component.TraceExporterOld, error) {
func (f *Factory) CreateTraceExporter(_ *zap.Logger, config configmodels.Exporter) (component.TraceExporter, error) {
zc := config.(*Config)

if zc.Endpoint == "" {
Expand All @@ -74,6 +74,6 @@ func (f *Factory) CreateTraceExporter(_ *zap.Logger, config configmodels.Exporte
}

// CreateMetricsExporter creates a metrics exporter based on this config.
func (f *Factory) CreateMetricsExporter(_ *zap.Logger, _ configmodels.Exporter) (component.MetricsExporterOld, error) {
func (f *Factory) CreateMetricsExporter(_ *zap.Logger, _ configmodels.Exporter) (component.MetricsExporter, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
30 changes: 11 additions & 19 deletions exporter/zipkinexporter/zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ import (
"fmt"
"net/http"

resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
zipkinmodel "github.com/openzipkin/zipkin-go/model"
zipkinproto "github.com/openzipkin/zipkin-go/proto/v2"
zipkinreporter "github.com/openzipkin/zipkin-go/reporter"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/translator/trace/zipkin"
)
Expand All @@ -46,12 +44,12 @@ type zipkinExporter struct {
}

// newTraceExporter creates an zipkin trace exporter.
func newTraceExporter(config *Config) (component.TraceExporterOld, error) {
func newTraceExporter(config *Config) (component.TraceExporter, error) {
ze, err := createZipkinExporter(config)
if err != nil {
return nil, err
}
zexp, err := exporterhelper.NewTraceExporterOld(config, ze.PushTraceData)
zexp, err := exporterhelper.NewTraceExporter(config, ze.PushTraceData)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -83,36 +81,30 @@ func createZipkinExporter(cfg *Config) (*zipkinExporter, error) {
return ze, nil
}

func (ze *zipkinExporter) PushTraceData(ctx context.Context, td consumerdata.TraceData) (int, error) {
tbatch := make([]*zipkinmodel.SpanModel, 0, len(td.Spans))
var resource *resourcepb.Resource = td.Resource

for _, span := range td.Spans {
zs, err := zipkin.OCSpanProtoToZipkin(td.Node, resource, span, ze.defaultServiceName)
if err != nil {
return len(td.Spans), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err))
}
tbatch = append(tbatch, zs)
func (ze *zipkinExporter) PushTraceData(ctx context.Context, td pdata.Traces) (int, error) {
kbrockhoff marked this conversation as resolved.
Show resolved Hide resolved
tbatch, err := zipkin.InternalTracesToZipkinSpans(td)
if err != nil {
return td.SpanCount(), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err))
}

body, err := ze.serializer.Serialize(tbatch)
if err != nil {
return len(td.Spans), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err))
return td.SpanCount(), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err))
}

req, err := http.NewRequestWithContext(ctx, "POST", ze.url, bytes.NewReader(body))
if err != nil {
return len(td.Spans), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)
return td.SpanCount(), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)
}
req.Header.Set("Content-Type", ze.serializer.ContentType())

resp, err := ze.client.Do(req)
if err != nil {
return len(td.Spans), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)
return td.SpanCount(), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)
}
_ = resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return len(td.Spans), fmt.Errorf("failed the request with status code %d", resp.StatusCode)
return td.SpanCount(), fmt.Errorf("failed the request with status code %d", resp.StatusCode)
}
return 0, nil
}
4 changes: 2 additions & 2 deletions exporter/zipkinexporter/zipkin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) {
mzr := newMockZipkinReporter(cst.URL)

// Run the Zipkin receiver to "receive spans upload from a client application"
zexp := processor.NewTraceFanOutConnectorOld([]consumer.TraceConsumerOld{tes})
zexp := processor.NewTraceFanOutConnector([]consumer.TraceConsumer{tes})
kbrockhoff marked this conversation as resolved.
Show resolved Hide resolved
addr := testutil.GetAvailableLocalAddress(t)
cfg := &zipkinreceiver.Config{
ReceiverSettings: configmodels.ReceiverSettings{
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestZipkinExporter_roundtripProto(t *testing.T) {
mzr.serializer = zipkinproto.SpanSerializer{}

// Run the Zipkin receiver to "receive spans upload from a client application"
zexp := processor.NewTraceFanOutConnectorOld([]consumer.TraceConsumerOld{tes})
zexp := processor.NewTraceFanOutConnector([]consumer.TraceConsumer{tes})
kbrockhoff marked this conversation as resolved.
Show resolved Hide resolved
port := testutil.GetAvailablePort(t)
cfg := &zipkinreceiver.Config{
ReceiverSettings: configmodels.ReceiverSettings{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (m *processMetadata) initializeResource(resource pdata.Resource) {
}

func (m *processMetadata) insertPid(attr pdata.AttributeMap) {
attr.InsertInt(conventions.AttributeProcessID, int64(m.pid))
attr.InsertInt(conventions.AttributeProcessPID, int64(m.pid))
kbrockhoff marked this conversation as resolved.
Show resolved Hide resolved
}

func (m *processMetadata) insertExecutable(attr pdata.AttributeMap) {
Expand Down Expand Up @@ -86,7 +86,7 @@ func (m *processMetadata) insertUsername(attr pdata.AttributeMap) {
return
}

attr.InsertString(conventions.AttributeProcessUsername, m.username)
attr.InsertString(conventions.AttributeProcessOwner, m.username)
}

// processHandles provides a wrapper around []*process.Process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ func TestScrapeMetrics(t *testing.T) {
func assertResourceAttributes(t *testing.T, resourceMetrics pdata.ResourceMetricsSlice) {
for i := 0; i < resourceMetrics.Len(); i++ {
attr := resourceMetrics.At(0).Resource().Attributes()
internal.AssertContainsAttribute(t, attr, conventions.AttributeProcessID)
internal.AssertContainsAttribute(t, attr, conventions.AttributeProcessPID)
kbrockhoff marked this conversation as resolved.
Show resolved Hide resolved
internal.AssertContainsAttribute(t, attr, conventions.AttributeProcessExecutableName)
internal.AssertContainsAttribute(t, attr, conventions.AttributeProcessExecutablePath)
internal.AssertContainsAttribute(t, attr, conventions.AttributeProcessCommand)
internal.AssertContainsAttribute(t, attr, conventions.AttributeProcessCommandLine)
internal.AssertContainsAttribute(t, attr, conventions.AttributeProcessUsername)
internal.AssertContainsAttribute(t, attr, conventions.AttributeProcessOwner)
}
}

Expand Down
4 changes: 2 additions & 2 deletions receiver/zipkinreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ func (f *Factory) CreateTraceReceiver(
ctx context.Context,
logger *zap.Logger,
cfg configmodels.Receiver,
nextConsumer consumer.TraceConsumerOld,
nextConsumer consumer.TraceConsumer,
) (component.TraceReceiver, error) {
rCfg := cfg.(*Config)
return New(rCfg, nextConsumer)
}

// CreateMetricsReceiver creates a metrics receiver based on provided config.
func (f *Factory) CreateMetricsReceiver(ctx context.Context, logger *zap.Logger, cfg configmodels.Receiver, nextConsumer consumer.MetricsConsumerOld) (component.MetricsReceiver, error) {
func (f *Factory) CreateMetricsReceiver(ctx context.Context, logger *zap.Logger, cfg configmodels.Receiver, nextConsumer consumer.MetricsConsumer) (component.MetricsReceiver, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
4 changes: 2 additions & 2 deletions receiver/zipkinreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand All @@ -36,7 +36,7 @@ func TestCreateDefaultConfig(t *testing.T) {
type mockTraceConsumer struct {
kbrockhoff marked this conversation as resolved.
Show resolved Hide resolved
}

func (m *mockTraceConsumer) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
func (m *mockTraceConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
return nil
}

Expand Down
Loading