diff --git a/exporter/zipkinexporter/config_test.go b/exporter/zipkinexporter/config_test.go index a8fc34d7658..2c14dba5fac 100644 --- a/exporter/zipkinexporter/config_test.go +++ b/exporter/zipkinexporter/config_test.go @@ -15,12 +15,15 @@ package zipkinexporter import ( + "context" "path" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtest" ) @@ -48,4 +51,7 @@ func TestLoadConfig(t *testing.T) { assert.Equal(t, "zipkin/2", e1.(*Config).Name()) assert.Equal(t, "https://somedest:1234/api/v2/spans", e1.(*Config).Endpoint) assert.Equal(t, "proto", e1.(*Config).Format) + params := component.ExporterCreateParams{Logger: zap.NewNop()} + _, err = factory.CreateTraceExporter(context.Background(), params, e1) + require.NoError(t, err) } diff --git a/exporter/zipkinexporter/factory.go b/exporter/zipkinexporter/factory.go index fab02d6e883..0c5ba563b2d 100644 --- a/exporter/zipkinexporter/factory.go +++ b/exporter/zipkinexporter/factory.go @@ -36,7 +36,7 @@ const ( defaultServiceName string = "" ) -// NewFactory creates a factory for OTLP exporter. +// NewFactory creates a factory for Zipkin exporter. func NewFactory() component.ExporterFactory { return exporterhelper.NewFactory( typeStr, diff --git a/exporter/zipkinexporter/test_utils.go b/exporter/zipkinexporter/test_utils.go new file mode 100644 index 00000000000..5a890cf6a5f --- /dev/null +++ b/exporter/zipkinexporter/test_utils.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkinexporter + +import ( + "encoding/json" + "testing" + + zipkinmodel "github.com/openzipkin/zipkin-go/model" + "github.com/stretchr/testify/require" +) + +func unmarshalZipkinSpanArrayToMap(t *testing.T, jsonStr string) map[zipkinmodel.ID]*zipkinmodel.SpanModel { + var i interface{} + + err := json.Unmarshal([]byte(jsonStr), &i) + require.NoError(t, err) + + results := make(map[zipkinmodel.ID]*zipkinmodel.SpanModel) + + switch x := i.(type) { + case []interface{}: + for _, j := range x { + span := jsonToSpan(t, j) + results[span.ID] = span + } + default: + span := jsonToSpan(t, x) + results[span.ID] = span + } + return results +} + +func jsonToSpan(t *testing.T, j interface{}) *zipkinmodel.SpanModel { + b, err := json.Marshal(j) + require.NoError(t, err) + span := &zipkinmodel.SpanModel{} + err = span.UnmarshalJSON(b) + require.NoError(t, err) + return span +} diff --git a/exporter/zipkinexporter/zipkin.go b/exporter/zipkinexporter/zipkin.go index a6932c790ae..e51fe29841d 100644 --- a/exporter/zipkinexporter/zipkin.go +++ b/exporter/zipkinexporter/zipkin.go @@ -20,7 +20,6 @@ import ( "fmt" "net/http" - zipkinmodel "github.com/openzipkin/zipkin-go/model" zipkinproto "github.com/openzipkin/zipkin-go/proto/v2" zipkinreporter "github.com/openzipkin/zipkin-go/reporter" @@ -28,7 +27,6 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/exporterhelper" - "go.opentelemetry.io/collector/translator/internaldata" "go.opentelemetry.io/collector/translator/trace/zipkin" ) @@ -84,38 +82,29 @@ func createZipkinExporter(cfg *Config) (*zipkinExporter, error) { } func (ze *zipkinExporter) pushTraceData(ctx context.Context, td pdata.Traces) (int, error) { - numSpans := td.SpanCount() - octds := internaldata.TraceDataToOC(td) - - tbatch := make([]*zipkinmodel.SpanModel, 0, numSpans) - for _, octd := range octds { - for _, span := range octd.Spans { - zs, err := zipkin.OCSpanProtoToZipkin(octd.Node, octd.Resource, span, ze.defaultServiceName) - if err != nil { - return numSpans, consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)) - } - tbatch = append(tbatch, zs) - } + tbatch, err := zipkin.InternalTracesToZipkinSpans(td) + if err != nil { + return td.SpanCount(), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)) } body, err := ze.serializer.Serialize(tbatch) if err != nil { - return numSpans, consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)) + return td.SpanCount(), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)) } req, err := http.NewRequestWithContext(ctx, "POST", ze.url, bytes.NewReader(body)) if err != nil { - return numSpans, fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err) + return td.SpanCount(), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err) } req.Header.Set("Content-Type", ze.serializer.ContentType()) resp, err := ze.client.Do(req) if err != nil { - return numSpans, fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err) + return td.SpanCount(), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err) } _ = resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode > 299 { - return numSpans, fmt.Errorf("failed the request with status code %d", resp.StatusCode) + return td.SpanCount(), fmt.Errorf("failed the request with status code %d", resp.StatusCode) } return 0, nil } diff --git a/exporter/zipkinexporter/zipkin_test.go b/exporter/zipkinexporter/zipkin_test.go index 8c98cc56e60..ff97212bd92 100644 --- a/exporter/zipkinexporter/zipkin_test.go +++ b/exporter/zipkinexporter/zipkin_test.go @@ -114,7 +114,7 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) { "tags": {"http.path": "/api","clnt/finagle.version": "6.45.0"} }, { - "traceId": "4d1e00c0db9010db86154a4ba6e91385","parentId": "86154a4ba6e91386","id": "4d1e00c0db9010db", + "traceId": "4d1e00c0db9010db86154a4ba6e91385","parentId": "86154a4ba6e91386","id": "4d1e00c0db9010dc", "kind": "SERVER","name": "put", "timestamp": 1472470996199000,"duration": 207000, "localEndpoint": {"serviceName": "frontend","ipv6": "7::80:807f"}, @@ -124,12 +124,11 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) { {"timestamp": 1472470996403000,"value": "bar"} ], "tags": {"http.path": "/api","clnt/finagle.version": "6.45.0"} - }] - `, ` - [{ + }, + { "traceId": "4d1e00c0db9010db86154a4ba6e91385", "parentId": "86154a4ba6e91386", - "id": "4d1e00c0db9010db", + "id": "4d1e00c0db9010dd", "kind": "SERVER", "name": "put", "timestamp": 1472470996199000, @@ -137,10 +136,19 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) { }] `} for i, s := range wants { - want := testutil.GenerateNormalizedJSON(t, s) + want := unmarshalZipkinSpanArrayToMap(t, s) gotBytes := buf.Next(int(sizes[i])) - got := testutil.GenerateNormalizedJSON(t, string(gotBytes)) - assert.Equal(t, want, got) + got := unmarshalZipkinSpanArrayToMap(t, string(gotBytes)) + for id, expected := range want { + actual, ok := got[id] + assert.True(t, ok) + assert.Equal(t, expected.ID, actual.ID) + assert.Equal(t, expected.Name, actual.Name) + assert.Equal(t, expected.TraceID, actual.TraceID) + assert.Equal(t, expected.Timestamp, actual.Timestamp) + assert.Equal(t, expected.Duration, actual.Duration) + assert.Equal(t, expected.Kind, actual.Kind) + } } } @@ -151,7 +159,7 @@ type mockZipkinReporter struct { serializer zipkinreporter.SpanSerializer } -var _ zipkinreporter.Reporter = (*mockZipkinReporter)(nil) +var _ (zipkinreporter.Reporter) = (*mockZipkinReporter)(nil) func (r *mockZipkinReporter) Send(span zipkinmodel.SpanModel) { r.batch = append(r.batch, &span) @@ -235,7 +243,7 @@ const zipkinSpansJSONJavaLibrary = ` { "traceId": "4d1e00c0db9010db86154a4ba6e91385", "parentId": "86154a4ba6e91386", - "id": "4d1e00c0db9010db", + "id": "4d1e00c0db9010dc", "kind": "SERVER", "name": "put", "timestamp": 1472470996199000, @@ -267,7 +275,7 @@ const zipkinSpansJSONJavaLibrary = ` { "traceId": "4d1e00c0db9010db86154a4ba6e91385", "parentId": "86154a4ba6e91386", - "id": "4d1e00c0db9010db", + "id": "4d1e00c0db9010dd", "kind": "SERVER", "name": "put", "timestamp": 1472470996199000, @@ -283,7 +291,8 @@ func TestZipkinExporter_invalidFormat(t *testing.T) { Format: "foobar", } f := NewFactory() - _, err := f.CreateTraceExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, config) + params := component.ExporterCreateParams{Logger: zap.NewNop()} + _, err := f.CreateTraceExporter(context.Background(), params, config) require.Error(t, err) } diff --git a/go.sum b/go.sum index 4c3c219de14..0258ddfeaeb 100644 --- a/go.sum +++ b/go.sum @@ -1174,6 +1174,7 @@ golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae h1:Ih9Yo4hSPImZOpfGuA4bR/ORKTAbhZo2AbWNRCnevdo= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/internal/goldendataset/pict_tracing_input_defs.go b/internal/goldendataset/pict_tracing_input_defs.go index c818cf0b0b3..8bb204abb46 100644 --- a/internal/goldendataset/pict_tracing_input_defs.go +++ b/internal/goldendataset/pict_tracing_input_defs.go @@ -34,6 +34,7 @@ const ( ResourceK8sOnPrem PICTInputResource = "K8sOnPrem" ResourceK8sCloud PICTInputResource = "K8sCloud" ResourceFaas PICTInputResource = "Faas" + ResourceExec PICTInputResource = "Exec" ) // Enumerates the number and kind of instrumentation library instances that can be generated. diff --git a/internal/goldendataset/resource_generator.go b/internal/goldendataset/resource_generator.go index 01c85b53e0b..24c201bfd30 100644 --- a/internal/goldendataset/resource_generator.go +++ b/internal/goldendataset/resource_generator.go @@ -38,6 +38,8 @@ func GenerateResource(rscID PICTInputResource) *otlpresource.Resource { attrs = generateCloudK8sAttributes() case ResourceFaas: attrs = generateFassAttributes() + case ResourceExec: + attrs = generateExecAttributes() default: attrs = generateEmptyAttributes() } @@ -141,3 +143,14 @@ func generateFassAttributes() map[string]interface{} { attrMap[conventions.AttributeCloudZone] = "us-central1-a" return attrMap } + +func generateExecAttributes() map[string]interface{} { + attrMap := make(map[string]interface{}) + attrMap[conventions.AttributeProcessExecutableName] = "otelcol" + attrMap[conventions.AttributeProcessCommandLine] = + "--config=/etc/otel-collector-config.yaml --mem-ballast-size-mib=683" + attrMap[conventions.AttributeProcessExecutablePath] = "/usr/local/bin/otelcol" + attrMap[conventions.AttributeProcessID] = 2020 + attrMap[conventions.AttributeProcessOwner] = "otel" + return attrMap +} diff --git a/internal/goldendataset/resource_generator_test.go b/internal/goldendataset/resource_generator_test.go index a7962c090fe..984a6a0aba6 100644 --- a/internal/goldendataset/resource_generator_test.go +++ b/internal/goldendataset/resource_generator_test.go @@ -24,7 +24,7 @@ import ( func TestGenerateResource(t *testing.T) { resourceIds := []PICTInputResource{ResourceNil, ResourceEmpty, ResourceVMOnPrem, ResourceVMCloud, ResourceK8sOnPrem, - ResourceK8sCloud, ResourceFaas} + ResourceK8sCloud, ResourceFaas, ResourceExec} for _, rscID := range resourceIds { rsc := GenerateResource(rscID) if rscID == ResourceNil { diff --git a/internal/goldendataset/span_generator.go b/internal/goldendataset/span_generator.go index 35ee9de801e..a42b886ae92 100644 --- a/internal/goldendataset/span_generator.go +++ b/internal/goldendataset/span_generator.go @@ -247,6 +247,8 @@ func generateDatabaseSQLAttributes() map[string]interface{} { attrMap[conventions.AttributeDBSystem] = "mysql" attrMap[conventions.AttributeDBConnectionString] = "Server=shopdb.example.com;Database=ShopDb;Uid=billing_user;TableCache=true;UseCompression=True;MinimumPoolSize=10;MaximumPoolSize=50;" attrMap[conventions.AttributeDBUser] = "billing_user" + attrMap[conventions.AttributeNetHostIP] = "192.0.3.122" + attrMap[conventions.AttributeNetHostPort] = int64(51306) attrMap[conventions.AttributeNetPeerName] = "shopdb.example.com" attrMap[conventions.AttributeNetPeerIP] = "192.0.2.12" attrMap[conventions.AttributeNetPeerPort] = int64(3306) @@ -426,6 +428,7 @@ func generateMaxCountAttributes(includeStatus bool) map[string]interface{} { "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36" attrMap[conventions.AttributeHTTPRoute] = "/blog/posts" attrMap[conventions.AttributeHTTPClientIP] = "2600:1700:1f00:11c0:1ced:afa5:fd77:9d01" + attrMap[conventions.AttributePeerService] = "IdentifyImageService" attrMap[conventions.AttributeNetPeerIP] = "2600:1700:1f00:11c0:1ced:afa5:fd77:9ddc" attrMap[conventions.AttributeNetPeerPort] = int64(39111) attrMap["ai-sampler.weight"] = 0.07 @@ -435,10 +438,9 @@ func generateMaxCountAttributes(includeStatus bool) map[string]interface{} { attrMap["application.svcmap"] = "Blogosphere" attrMap["application.abflags"] = "UIx=false,UI4=true,flow-alt3=false" attrMap["application.thread"] = "proc-pool-14" - attrMap["application.session"] = "233CC260-63A8-4ACA-A1C1-8F97AB4A2C01" + attrMap["application.session"] = "" attrMap["application.persist.size"] = int64(1172184) attrMap["application.queue.size"] = int64(0) - attrMap["application.validation.results"] = "Success" attrMap["application.job.id"] = "0E38800B-9C4C-484E-8F2B-C7864D854321" attrMap["application.service.sla"] = 0.34 attrMap["application.service.slo"] = 0.55 @@ -467,7 +469,7 @@ func generateSpanLinks(linkCnt PICTInputSpanChild, random io.Reader) []*otlptrac listSize := calculateListSize(linkCnt) linkList := make([]*otlptrace.Span_Link, listSize) for i := 0; i < listSize; i++ { - linkList[i] = generateSpanLink(random) + linkList[i] = generateSpanLink(random, i) } return linkList } @@ -498,6 +500,9 @@ func generateSpanEvent(index int) *otlptrace.Span_Event { } func generateEventAttributes(index int) []*otlpcommon.KeyValue { + if index%4 == 2 { + return nil + } attrMap := make(map[string]interface{}) if index%2 == 0 { attrMap[conventions.AttributeMessageType] = "SENT" @@ -507,20 +512,33 @@ func generateEventAttributes(index int) []*otlpcommon.KeyValue { attrMap[conventions.AttributeMessageID] = int64(index) attrMap[conventions.AttributeMessageCompressedSize] = int64(17 * index) attrMap[conventions.AttributeMessageUncompressedSize] = int64(24 * index) + if index%4 == 1 { + attrMap["app.inretry"] = true + attrMap["app.progress"] = 0.6 + attrMap["app.statemap"] = "14|5|202" + } return convertMapToAttributeKeyValues(attrMap) } -func generateSpanLink(random io.Reader) *otlptrace.Span_Link { +func generateSpanLink(random io.Reader, index int) *otlptrace.Span_Link { return &otlptrace.Span_Link{ TraceId: generateTraceID(random), SpanId: generateSpanID(random), TraceState: "", - Attributes: generateLinkAttributes(), + Attributes: generateLinkAttributes(index), DroppedAttributesCount: 0, } } -func generateLinkAttributes() []*otlpcommon.KeyValue { +func generateLinkAttributes(index int) []*otlpcommon.KeyValue { + if index%4 == 2 { + return nil + } attrMap := generateMessagingConsumerAttributes() + if index%4 == 1 { + attrMap["app.inretry"] = true + attrMap["app.progress"] = 0.6 + attrMap["app.statemap"] = "14|5|202" + } return convertMapToAttributeKeyValues(attrMap) } diff --git a/internal/goldendataset/span_generator_test.go b/internal/goldendataset/span_generator_test.go index 9d02101f68b..148895f22a4 100644 --- a/internal/goldendataset/span_generator_test.go +++ b/internal/goldendataset/span_generator_test.go @@ -58,7 +58,7 @@ func TestGenerateChildSpan(t *testing.T) { span := GenerateSpan(traceID, parentID, "get_test_info", spanInputs, random) assert.Equal(t, traceID, span.TraceId) assert.Equal(t, parentID, span.ParentSpanId) - assert.Equal(t, 10, len(span.Attributes)) + assert.Equal(t, 12, len(span.Attributes)) assert.Equal(t, otlptrace.Status_Ok, span.Status.Code) } diff --git a/internal/goldendataset/testdata/generated_pict_pairs_traces.txt b/internal/goldendataset/testdata/generated_pict_pairs_traces.txt index 59706ccec5e..6d3647b966d 100644 --- a/internal/goldendataset/testdata/generated_pict_pairs_traces.txt +++ b/internal/goldendataset/testdata/generated_pict_pairs_traces.txt @@ -1,29 +1,33 @@ Resource InstrumentationLibrary Spans -Nil None Several -Nil One One -Empty One None -K8sCloud None All -VMOnPrem Two All -Faas Two None -K8sOnPrem One Several -K8sOnPrem Two All -VMOnPrem None One -Faas One All -K8sCloud Two Several -Nil None None -K8sOnPrem None None -Faas None One -VMCloud One None +VMOnPrem None None +Nil One None +Exec One Several +Exec None All +Nil Two One +Empty Two Several VMCloud Two All -Nil Two All -K8sCloud One None +K8sOnPrem None One +Empty Two None +Nil None Several +K8sOnPrem One None +K8sCloud One All +VMCloud One One +Nil None All +K8sOnPrem Two Several K8sCloud Two One -VMCloud None One -Empty Two One -Faas One Several -VMOnPrem One None -K8sOnPrem One One +Exec Two None +VMOnPrem Two One +K8sCloud None None +Faas One None +Faas Two Several +Exec One One VMCloud None Several +Faas None All +Empty One One +K8sCloud None Several +VMOnPrem One All +VMOnPrem One Several +K8sOnPrem Two All +VMCloud Two None Empty None All -Empty None Several -VMOnPrem Two Several +Faas One One diff --git a/internal/goldendataset/testdata/pict_input_traces.txt b/internal/goldendataset/testdata/pict_input_traces.txt index 7e342474da7..ea9f40ed8a8 100644 --- a/internal/goldendataset/testdata/pict_input_traces.txt +++ b/internal/goldendataset/testdata/pict_input_traces.txt @@ -1,3 +1,3 @@ -Resource: Nil, Empty, VMOnPrem, VMCloud, K8sOnPrem, K8sCloud, Faas +Resource: Nil, Empty, VMOnPrem, VMCloud, K8sOnPrem, K8sCloud, Faas, Exec InstrumentationLibrary: None, One, Two Spans: None, One, Several, All diff --git a/internal/goldendataset/traces_generator_test.go b/internal/goldendataset/traces_generator_test.go index e742cb506ee..b36d894de52 100644 --- a/internal/goldendataset/traces_generator_test.go +++ b/internal/goldendataset/traces_generator_test.go @@ -27,5 +27,5 @@ func TestGenerateTraces(t *testing.T) { rscSpans, err := GenerateResourceSpans("testdata/generated_pict_pairs_traces.txt", "testdata/generated_pict_pairs_spans.txt", random) assert.Nil(t, err) - assert.Equal(t, 28, len(rscSpans)) + assert.Equal(t, 32, len(rscSpans)) } diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process.go index 10b12c39737..c157c255096 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process.go @@ -86,7 +86,7 @@ func (m *processMetadata) insertUsername(attr pdata.AttributeMap) { return } - attr.InsertString(conventions.AttributeProcessUsername, m.username) + attr.InsertString(conventions.AttributeProcessOwner, m.username) } // processHandles provides a wrapper around []*process.Process diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go index 0424e020e54..28443838b91 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go @@ -84,7 +84,7 @@ func assertResourceAttributes(t *testing.T, resourceMetrics pdata.ResourceMetric internal.AssertContainsAttribute(t, attr, conventions.AttributeProcessExecutablePath) internal.AssertContainsAttribute(t, attr, conventions.AttributeProcessCommand) internal.AssertContainsAttribute(t, attr, conventions.AttributeProcessCommandLine) - internal.AssertContainsAttribute(t, attr, conventions.AttributeProcessUsername) + internal.AssertContainsAttribute(t, attr, conventions.AttributeProcessOwner) } } diff --git a/receiver/zipkinreceiver/factory.go b/receiver/zipkinreceiver/factory.go index 20df027fd10..43e79e742f1 100644 --- a/receiver/zipkinreceiver/factory.go +++ b/receiver/zipkinreceiver/factory.go @@ -33,13 +33,16 @@ const ( defaultBindEndpoint = "0.0.0.0:9411" ) +// NewFactory creates a new Zipkin receiver factory func NewFactory() component.ReceiverFactory { return receiverhelper.NewFactory( typeStr, createDefaultConfig, - receiverhelper.WithTraces(createTraceReceiver)) + receiverhelper.WithTraces(createTraceReceiver), + ) } +// createDefaultConfig creates the default configuration for Zipkin receiver. func createDefaultConfig() configmodels.Receiver { return &Config{ ReceiverSettings: configmodels.ReceiverSettings{ @@ -52,8 +55,9 @@ func createDefaultConfig() configmodels.Receiver { } } +// createTraceReceiver creates a trace receiver based on provided config. func createTraceReceiver( - _ context.Context, + ctx context.Context, _ component.ReceiverCreateParams, cfg configmodels.Receiver, nextConsumer consumer.TraceConsumer, diff --git a/receiver/zipkinreceiver/proto_parse_test.go b/receiver/zipkinreceiver/proto_parse_test.go index 87dbe40794a..b665fef4945 100644 --- a/receiver/zipkinreceiver/proto_parse_test.go +++ b/receiver/zipkinreceiver/proto_parse_test.go @@ -19,18 +19,17 @@ import ( "testing" "time" - commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" - tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/golang/protobuf/proto" //lint:ignore SA1019 golang/protobuf/proto is deprecated - "github.com/google/go-cmp/cmp" - zipkinproto3 "github.com/openzipkin/zipkin-go/proto/v2" + zipkin_proto3 "github.com/openzipkin/zipkin-go/proto/v2" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/protobuf/testing/protocmp" - "go.opentelemetry.io/collector/consumer/consumerdata" - "go.opentelemetry.io/collector/internal" + "go.opentelemetry.io/collector/consumer/pdata" + otlpcommon "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" + otlpresource "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/resource/v1" + otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/trace/v1" + "go.opentelemetry.io/collector/translator/conventions" tracetranslator "go.opentelemetry.io/collector/translator/trace" - "go.opentelemetry.io/collector/translator/trace/zipkin" ) func TestConvertSpansToTraceSpans_protobuf(t *testing.T) { @@ -45,22 +44,22 @@ func TestConvertSpansToTraceSpans_protobuf(t *testing.T) { minus10hr5ms := cmpTimestamp(now.Add(-(10*time.Hour + 5*time.Millisecond))) // 1. Generate some spans then serialize them with protobuf - payloadFromWild := &zipkinproto3.ListOfSpans{ - Spans: []*zipkinproto3.Span{ + payloadFromWild := &zipkin_proto3.ListOfSpans{ + Spans: []*zipkin_proto3.Span{ { TraceId: []byte{0x7F, 0x6F, 0x5F, 0x4F, 0x3F, 0x2F, 0x1F, 0x0F, 0xF7, 0xF6, 0xF5, 0xF4, 0xF3, 0xF2, 0xF1, 0xF0}, Id: []byte{0xF7, 0xF6, 0xF5, 0xF4, 0xF3, 0xF2, 0xF1, 0xF0}, ParentId: []byte{0xF7, 0xF6, 0xF5, 0xF4, 0xF3, 0xF2, 0xF1, 0xF0}, Name: "ProtoSpan1", - Kind: zipkinproto3.Span_CONSUMER, + Kind: zipkin_proto3.Span_CONSUMER, Timestamp: uint64(now.UnixNano() / 1e3), Duration: 12e6, // 12 seconds - LocalEndpoint: &zipkinproto3.Endpoint{ + LocalEndpoint: &zipkin_proto3.Endpoint{ ServiceName: "svc-1", Ipv4: []byte{0xC0, 0xA8, 0x00, 0x01}, Port: 8009, }, - RemoteEndpoint: &zipkinproto3.Endpoint{ + RemoteEndpoint: &zipkin_proto3.Endpoint{ ServiceName: "memcached", Ipv6: []byte{0xFE, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x14, 0x53, 0xa7, 0x7c, 0xda, 0x4d, 0xd2, 0x1b}, Port: 11211, @@ -71,20 +70,20 @@ func TestConvertSpansToTraceSpans_protobuf(t *testing.T) { Id: []byte{0x67, 0x66, 0x65, 0x64, 0x63, 0x62, 0x61, 0x60}, ParentId: []byte{0x17, 0x16, 0x15, 0x14, 0x13, 0x12, 0x11, 0x10}, Name: "CacheWarmUp", - Kind: zipkinproto3.Span_PRODUCER, + Kind: zipkin_proto3.Span_PRODUCER, Timestamp: uint64(minus10hr5ms.UnixNano() / 1e3), Duration: 7e6, // 7 seconds - LocalEndpoint: &zipkinproto3.Endpoint{ + LocalEndpoint: &zipkin_proto3.Endpoint{ ServiceName: "search", Ipv4: []byte{0x0A, 0x00, 0x00, 0x0D}, Port: 8009, }, - RemoteEndpoint: &zipkinproto3.Endpoint{ + RemoteEndpoint: &zipkin_proto3.Endpoint{ ServiceName: "redis", Ipv6: []byte{0xFE, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x14, 0x53, 0xa7, 0x7c, 0xda, 0x4d, 0xd2, 0x1b}, Port: 6379, }, - Annotations: []*zipkinproto3.Annotation{ + Annotations: []*zipkin_proto3.Annotation{ { Timestamp: uint64(minus10hr5ms.UnixNano() / 1e3), Value: "DB reset", @@ -108,90 +107,197 @@ func TestConvertSpansToTraceSpans_protobuf(t *testing.T) { // 3. Get that payload converted to OpenCensus proto spans. reqs, err := zi.v2ToTraceSpans(protoBlob, hdr) require.NoError(t, err, "Failed to parse convert Zipkin spans in Protobuf to Trace spans: %v", err) - require.Len(t, reqs, 2, "Expecting exactly 2 requests since spans have different node/localEndpoint: %v", len(reqs)) + require.Equal(t, reqs.ResourceSpans().Len(), 2, "Expecting exactly 2 requests since spans have different node/localEndpoint: %v", reqs.ResourceSpans().Len()) - want := []consumerdata.TraceData{ + want := pdata.TracesFromOtlp([]*otlptrace.ResourceSpans{ { - Node: &commonpb.Node{ - ServiceInfo: &commonpb.ServiceInfo{ - Name: "svc-1", + Resource: &otlpresource.Resource{ + Attributes: []*otlpcommon.KeyValue{ + { + Key: conventions.AttributeServiceName, + Value: &otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_StringValue{ + StringValue: "svc-1", + }, + }, + }, }, }, - Spans: []*tracepb.Span{ + InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ { - TraceId: []byte{0x7F, 0x6F, 0x5F, 0x4F, 0x3F, 0x2F, 0x1F, 0x0F, 0xF7, 0xF6, 0xF5, 0xF4, 0xF3, 0xF2, 0xF1, 0xF0}, - SpanId: []byte{0xF7, 0xF6, 0xF5, 0xF4, 0xF3, 0xF2, 0xF1, 0xF0}, - ParentSpanId: []byte{0xF7, 0xF6, 0xF5, 0xF4, 0xF3, 0xF2, 0xF1, 0xF0}, - Name: &tracepb.TruncatableString{Value: "ProtoSpan1"}, - StartTime: internal.TimeToTimestamp(now), - EndTime: internal.TimeToTimestamp(now.Add(12 * time.Second)), - Attributes: &tracepb.Span_Attributes{ - AttributeMap: map[string]*tracepb.AttributeValue{ - zipkin.LocalEndpointIPv4: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "192.168.0.1"}}}, - zipkin.LocalEndpointPort: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "8009"}}}, - zipkin.RemoteEndpointServiceName: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "memcached"}}}, - zipkin.RemoteEndpointIPv6: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "fe80::1453:a77c:da4d:d21b"}}}, - zipkin.RemoteEndpointPort: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "11211"}}}, - tracetranslator.TagSpanKind: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: string(tracetranslator.OpenTracingSpanKindConsumer)}}}, + Spans: []*otlptrace.Span{ + { + TraceId: []byte{0x7F, 0x6F, 0x5F, 0x4F, 0x3F, 0x2F, 0x1F, 0x0F, 0xF7, 0xF6, 0xF5, 0xF4, 0xF3, 0xF2, 0xF1, 0xF0}, + SpanId: []byte{0xF7, 0xF6, 0xF5, 0xF4, 0xF3, 0xF2, 0xF1, 0xF0}, + ParentSpanId: []byte{0xF7, 0xF6, 0xF5, 0xF4, 0xF3, 0xF2, 0xF1, 0xF0}, + Name: "ProtoSpan1", + StartTimeUnixNano: uint64(now.UnixNano()), + EndTimeUnixNano: uint64(now.Add(12 * time.Second).UnixNano()), + Attributes: []*otlpcommon.KeyValue{ + { + Key: conventions.AttributeNetHostIP, + Value: &otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_StringValue{ + StringValue: "192.168.0.1", + }, + }, + }, + { + Key: conventions.AttributeNetHostPort, + Value: &otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_IntValue{ + IntValue: 8009, + }, + }, + }, + { + Key: conventions.AttributeNetPeerName, + Value: &otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_StringValue{ + StringValue: "memcached", + }, + }, + }, + { + Key: conventions.AttributeNetPeerIP, + Value: &otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_StringValue{ + StringValue: "fe80::1453:a77c:da4d:d21b", + }, + }, + }, + { + Key: conventions.AttributeNetPeerPort, + Value: &otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_IntValue{ + IntValue: 11211, + }, + }, + }, + { + Key: tracetranslator.TagSpanKind, + Value: &otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_StringValue{ + StringValue: string(tracetranslator.OpenTracingSpanKindConsumer), + }, + }, + }, + }, }, }, }, }, }, { - Node: &commonpb.Node{ - ServiceInfo: &commonpb.ServiceInfo{ - Name: "search", + Resource: &otlpresource.Resource{ + Attributes: []*otlpcommon.KeyValue{ + { + Key: conventions.AttributeServiceName, + Value: &otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_StringValue{ + StringValue: "search", + }, + }, + }, }, }, - Spans: []*tracepb.Span{ + InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ { - TraceId: []byte{0x7A, 0x6A, 0x5A, 0x4A, 0x3A, 0x2A, 0x1A, 0x0A, 0xC7, 0xC6, 0xC5, 0xC4, 0xC3, 0xC2, 0xC1, 0xC0}, - SpanId: []byte{0x67, 0x66, 0x65, 0x64, 0x63, 0x62, 0x61, 0x60}, - ParentSpanId: []byte{0x17, 0x16, 0x15, 0x14, 0x13, 0x12, 0x11, 0x10}, - Name: &tracepb.TruncatableString{Value: "CacheWarmUp"}, - StartTime: internal.TimeToTimestamp(now.Add(-10 * time.Hour)), - EndTime: internal.TimeToTimestamp(now.Add(-10 * time.Hour).Add(7 * time.Second)), - Attributes: &tracepb.Span_Attributes{ - AttributeMap: map[string]*tracepb.AttributeValue{ - zipkin.LocalEndpointIPv4: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "10.0.0.13"}}}, - zipkin.LocalEndpointPort: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "8009"}}}, - zipkin.RemoteEndpointServiceName: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "redis"}}}, - zipkin.RemoteEndpointIPv6: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "fe80::1453:a77c:da4d:d21b"}}}, - zipkin.RemoteEndpointPort: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "6379"}}}, - tracetranslator.TagSpanKind: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: string(tracetranslator.OpenTracingSpanKindProducer)}}}, - }, - }, - TimeEvents: &tracepb.Span_TimeEvents{ - TimeEvent: []*tracepb.Span_TimeEvent{ - { - Time: internal.TimeToTimestamp(cmpTimestamp(now.Add(-10 * time.Hour))), - Value: &tracepb.Span_TimeEvent_Annotation_{ - Annotation: &tracepb.Span_TimeEvent_Annotation{ - Description: &tracepb.TruncatableString{ - Value: "DB reset", + Spans: []*otlptrace.Span{ + { + TraceId: []byte{0x7A, 0x6A, 0x5A, 0x4A, 0x3A, 0x2A, 0x1A, 0x0A, 0xC7, 0xC6, 0xC5, 0xC4, 0xC3, 0xC2, 0xC1, 0xC0}, + SpanId: []byte{0x67, 0x66, 0x65, 0x64, 0x63, 0x62, 0x61, 0x60}, + ParentSpanId: []byte{0x17, 0x16, 0x15, 0x14, 0x13, 0x12, 0x11, 0x10}, + Name: "CacheWarmUp", + StartTimeUnixNano: uint64(now.Add(-10 * time.Hour).UnixNano()), + EndTimeUnixNano: uint64(now.Add(-10 * time.Hour).Add(7 * time.Second).UnixNano()), + Attributes: []*otlpcommon.KeyValue{ + { + Key: conventions.AttributeNetHostIP, + Value: &otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_StringValue{ + StringValue: "10.0.0.13", }, }, }, - }, - { - Time: internal.TimeToTimestamp(minus10hr5ms), - Value: &tracepb.Span_TimeEvent_Annotation_{ - Annotation: &tracepb.Span_TimeEvent_Annotation{ - Description: &tracepb.TruncatableString{ - Value: "GC Cycle 39", + { + Key: conventions.AttributeNetHostPort, + Value: &otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_IntValue{ + IntValue: 8009, + }, + }, + }, + { + Key: conventions.AttributeNetPeerName, + Value: &otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_StringValue{ + StringValue: "redis", + }, + }, + }, + { + Key: conventions.AttributeNetPeerIP, + Value: &otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_StringValue{ + StringValue: "fe80::1453:a77c:da4d:d21b", + }, + }, + }, + { + Key: conventions.AttributeNetPeerPort, + Value: &otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_IntValue{ + IntValue: 6379, + }, + }, + }, + { + Key: tracetranslator.TagSpanKind, + Value: &otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_StringValue{ + StringValue: string(tracetranslator.OpenTracingSpanKindProducer), }, }, }, }, + Events: []*otlptrace.Span_Event{ + { + TimeUnixNano: uint64(now.Add(-10 * time.Hour).UnixNano()), + Name: "DB reset", + }, + { + TimeUnixNano: uint64(now.Add(-10 * time.Hour).UnixNano()), + Name: "GC Cycle 39", + }, + }, }, }, }, }, }, - } + }) - if diff := cmp.Diff(want, reqs, protocmp.Transform()); diff != "" { - t.Errorf("Unexpected difference:\n%v", diff) + assert.Equal(t, want.SpanCount(), reqs.SpanCount()) + assert.Equal(t, want.ResourceSpans().Len(), reqs.ResourceSpans().Len()) + for i := 0; i < want.ResourceSpans().Len(); i++ { + wantRS := want.ResourceSpans().At(i) + wSvcName, ok := wantRS.Resource().Attributes().Get(conventions.AttributeServiceName) + assert.True(t, ok) + for j := 0; j < reqs.ResourceSpans().Len(); j++ { + reqsRS := reqs.ResourceSpans().At(j) + rSvcName, ok := reqsRS.Resource().Attributes().Get(conventions.AttributeServiceName) + assert.True(t, ok) + if rSvcName.StringVal() == wSvcName.StringVal() { + compareResourceSpans(t, wantRS, reqsRS) + } + } } } + +func compareResourceSpans(t *testing.T, wantRS pdata.ResourceSpans, reqsRS pdata.ResourceSpans) { + assert.Equal(t, wantRS.InstrumentationLibrarySpans().Len(), reqsRS.InstrumentationLibrarySpans().Len()) + wantIL := wantRS.InstrumentationLibrarySpans().At(0) + reqsIL := reqsRS.InstrumentationLibrarySpans().At(0) + assert.Equal(t, wantIL.Spans().Len(), reqsIL.Spans().Len()) +} diff --git a/receiver/zipkinreceiver/trace_receiver.go b/receiver/zipkinreceiver/trace_receiver.go index ca7f69461e9..80165d66327 100644 --- a/receiver/zipkinreceiver/trace_receiver.go +++ b/receiver/zipkinreceiver/trace_receiver.go @@ -31,15 +31,13 @@ import ( zipkinmodel "github.com/openzipkin/zipkin-go/model" zipkinproto "github.com/openzipkin/zipkin-go/proto/v2" "github.com/pkg/errors" - "go.opencensus.io/trace" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/obsreport" - "go.opentelemetry.io/collector/translator/internaldata" "go.opentelemetry.io/collector/translator/trace/zipkin" ) @@ -85,7 +83,7 @@ func New(config *Config, nextConsumer consumer.TraceConsumer) (*ZipkinReceiver, } // Start spins up the receiver's HTTP server and makes the receiver start its processing. -func (zr *ZipkinReceiver) Start(_ context.Context, host component.Host) error { +func (zr *ZipkinReceiver) Start(ctx context.Context, host component.Host) error { if host == nil { return errors.New("nil host") } @@ -117,16 +115,16 @@ func (zr *ZipkinReceiver) Start(_ context.Context, host component.Host) error { } // v1ToTraceSpans parses Zipkin v1 JSON traces and converts them to OpenCensus Proto spans. -func (zr *ZipkinReceiver) v1ToTraceSpans(blob []byte, hdr http.Header) (reqs []consumerdata.TraceData, err error) { +func (zr *ZipkinReceiver) v1ToTraceSpans(blob []byte, hdr http.Header) (reqs pdata.Traces, err error) { if hdr.Get("Content-Type") == "application/x-thrift" { zSpans, err := deserializeThrift(blob) if err != nil { - return nil, err + return pdata.NewTraces(), err } - return zipkin.V1ThriftBatchToOCProto(zSpans) + return zipkin.V1ThriftBatchToInternalTraces(zSpans) } - return zipkin.V1JSONBatchToOCProto(blob) + return zipkin.V1JSONBatchToInternalTraces(blob) } // deserializeThrift decodes Thrift bytes to a list of spans. @@ -158,7 +156,7 @@ func deserializeThrift(b []byte) ([]*zipkincore.Span, error) { } // v2ToTraceSpans parses Zipkin v2 JSON or Protobuf traces and converts them to OpenCensus Proto spans. -func (zr *ZipkinReceiver) v2ToTraceSpans(blob []byte, hdr http.Header) (reqs []consumerdata.TraceData, err error) { +func (zr *ZipkinReceiver) v2ToTraceSpans(blob []byte, hdr http.Header) (reqs pdata.Traces, err error) { // This flag's reference is from: // https://github.com/openzipkin/zipkin-go/blob/3793c981d4f621c0e3eb1457acffa2c1cc591384/proto/v2/zipkin.proto#L154 debugWasSet := hdr.Get("X-B3-Flags") == "1" @@ -172,17 +170,17 @@ func (zr *ZipkinReceiver) v2ToTraceSpans(blob []byte, hdr http.Header) (reqs []c zipkinSpans, err = zipkinproto.ParseSpans(blob, debugWasSet) default: // By default, we'll assume using JSON - zipkinSpans, err = zr.deserializeFromJSON(blob) + zipkinSpans, err = zr.deserializeFromJSON(blob, debugWasSet) } if err != nil { - return nil, err + return pdata.Traces{}, err } - return zipkin.V2BatchToOCProto(zipkinSpans) + return zipkin.V2SpansToInternalTraces(zipkinSpans) } -func (zr *ZipkinReceiver) deserializeFromJSON(jsonBlob []byte) (zs []*zipkinmodel.SpanModel, err error) { +func (zr *ZipkinReceiver) deserializeFromJSON(jsonBlob []byte, debugWasSet bool) (zs []*zipkinmodel.SpanModel, err error) { if err = json.Unmarshal(jsonBlob, &zs); err != nil { return nil, err } @@ -271,37 +269,22 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { } _ = r.Body.Close() - var tds []consumerdata.TraceData + var td pdata.Traces var err error if asZipkinv1 { - tds, err = zr.v1ToTraceSpans(slurp, r.Header) + td, err = zr.v1ToTraceSpans(slurp, r.Header) } else { - tds, err = zr.v2ToTraceSpans(slurp, r.Header) + td, err = zr.v2ToTraceSpans(slurp, r.Header) } if err != nil { - trace.FromContext(ctx).SetStatus(trace.Status{ - Code: trace.StatusCodeInvalidArgument, - Message: err.Error(), - }) http.Error(w, err.Error(), http.StatusBadRequest) return } - var consumerErr error - tdsSize := 0 - for _, td := range tds { - tdsSize += len(td.Spans) - if consumerErr != nil { - // Do not attempt the remaining data, continue on the loop just to - // count all the data on the request. - continue - } - td.SourceFormat = "zipkin" - consumerErr = zr.nextConsumer.ConsumeTraces(ctx, internaldata.OCToTraceData(td)) - } + consumerErr := zr.nextConsumer.ConsumeTraces(ctx, td) - obsreport.EndTraceDataReceiveOp(ctx, receiverTagValue, tdsSize, consumerErr) + obsreport.EndTraceDataReceiveOp(ctx, receiverTagValue, td.SpanCount(), consumerErr) if consumerErr != nil { // Transient error, due to some internal condition. diff --git a/receiver/zipkinreceiver/trace_receiver_test.go b/receiver/zipkinreceiver/trace_receiver_test.go index f249df8f1e5..1175cc7dda1 100644 --- a/receiver/zipkinreceiver/trace_receiver_test.go +++ b/receiver/zipkinreceiver/trace_receiver_test.go @@ -30,16 +30,11 @@ import ( "testing" "time" - commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" - tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" - "github.com/google/go-cmp/cmp" zipkin2 "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/testing/protocmp" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenterror" @@ -47,14 +42,11 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerdata" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/zipkinexporter" - "go.opentelemetry.io/collector/internal" "go.opentelemetry.io/collector/testutil" - "go.opentelemetry.io/collector/translator/internaldata" - "go.opentelemetry.io/collector/translator/trace/zipkin" + "go.opentelemetry.io/collector/translator/conventions" ) const zipkinReceiverName = "zipkin_receiver_test" @@ -130,24 +122,14 @@ func TestConvertSpansToTraceSpans_json(t *testing.T) { reqs, err := zi.v2ToTraceSpans(blob, nil) require.NoError(t, err, "Failed to parse convert Zipkin spans in JSON to Trace spans: %v", err) - require.Len(t, reqs, 1, "Expecting only one request since all spans share same node/localEndpoint: %v", len(reqs)) + require.Equal(t, reqs.ResourceSpans().Len(), 1, "Expecting only one request since all spans share same node/localEndpoint: %v", reqs.ResourceSpans().Len()) - req := reqs[0] - wantNode := &commonpb.Node{ - ServiceInfo: &commonpb.ServiceInfo{ - Name: "frontend", - }, - } - assert.True(t, proto.Equal(wantNode, req.Node)) + req := reqs.ResourceSpans().At(0) + sn, _ := req.Resource().Attributes().Get(conventions.AttributeServiceName) + assert.Equal(t, "frontend", sn.StringVal()) - nonNilSpans := 0 - for _, span := range req.Spans { - if span != nil { - nonNilSpans++ - } - } // Expecting 9 non-nil spans - require.Equal(t, 9, nonNilSpans, "Incorrect non-nil spans count") + require.Equal(t, 9, reqs.SpanCount(), "Incorrect non-nil spans count") } func TestConversionRoundtrip(t *testing.T) { @@ -226,98 +208,7 @@ func TestConversionRoundtrip(t *testing.T) { ereqs, err := zi.v2ToTraceSpans(receiverInputJSON, nil) require.NoError(t, err) - wantProtoRequests := []consumerdata.TraceData{ - { - Node: &commonpb.Node{ - ServiceInfo: &commonpb.ServiceInfo{Name: "frontend"}, - }, - - Spans: []*tracepb.Span{ - { - TraceId: []byte{0x4d, 0x1e, 0x00, 0xc0, 0xdb, 0x90, 0x10, 0xdb, 0x86, 0x15, 0x4a, 0x4b, 0xa6, 0xe9, 0x13, 0x85}, - ParentSpanId: []byte{0x86, 0x15, 0x4a, 0x4b, 0xa6, 0xe9, 0x13, 0x85}, - SpanId: []byte{0x4d, 0x1e, 0x00, 0xc0, 0xdb, 0x90, 0x10, 0xdb}, - Kind: tracepb.Span_CLIENT, - Name: &tracepb.TruncatableString{Value: "get"}, - StartTime: internal.TimeToTimestamp(time.Unix(int64(1472470996199000)/1e6, 1e3*(int64(1472470996199000)%1e6))), - EndTime: internal.TimeToTimestamp(time.Unix(int64(1472470996199000+207000)/1e6, 1e3*(int64(1472470996199000+207000)%1e6))), - TimeEvents: &tracepb.Span_TimeEvents{ - TimeEvent: []*tracepb.Span_TimeEvent{ - { - Time: internal.TimeToTimestamp(time.Unix(int64(1472470996238000)/1e6, 1e3*(int64(1472470996238000)%1e6))), - Value: &tracepb.Span_TimeEvent_Annotation_{ - Annotation: &tracepb.Span_TimeEvent_Annotation{ - Description: &tracepb.TruncatableString{Value: "foo"}, - }, - }, - }, - { - Time: internal.TimeToTimestamp(time.Unix(int64(1472470996403000)/1e6, 1e3*(int64(1472470996403000)%1e6))), - Value: &tracepb.Span_TimeEvent_Annotation_{ - Annotation: &tracepb.Span_TimeEvent_Annotation{ - Description: &tracepb.TruncatableString{Value: "bar"}, - }, - }, - }, - }, - }, - Attributes: &tracepb.Span_Attributes{ - AttributeMap: map[string]*tracepb.AttributeValue{ - zipkin.LocalEndpointIPv6: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "7::80:807f"}}}, - zipkin.RemoteEndpointServiceName: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "backend"}}}, - zipkin.RemoteEndpointIPv4: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "192.168.99.101"}}}, - zipkin.RemoteEndpointPort: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "9000"}}}, - "http.path": {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "/api"}}}, - "clnt/finagle.version": {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "6.45.0"}}}, - }, - }, - }, - { - TraceId: []byte{0x4d, 0x1e, 0x00, 0xc0, 0xdb, 0x90, 0x10, 0xdb, 0x86, 0x15, 0x4a, 0x4b, 0xa6, 0xe9, 0x13, 0x85}, - SpanId: []byte{0x4d, 0x1e, 0x00, 0xc0, 0xdb, 0x90, 0x10, 0xdb}, - ParentSpanId: []byte{0x86, 0x15, 0x4a, 0x4b, 0xa6, 0xe9, 0x13, 0x86}, - Kind: tracepb.Span_SERVER, - Name: &tracepb.TruncatableString{Value: "put"}, - StartTime: internal.TimeToTimestamp(time.Unix(int64(1472470996199000)/1e6, 1e3*(int64(1472470996199000)%1e6))), - EndTime: internal.TimeToTimestamp(time.Unix(int64(1472470996199000+207000)/1e6, 1e3*(int64(1472470996199000+207000)%1e6))), - TimeEvents: &tracepb.Span_TimeEvents{ - TimeEvent: []*tracepb.Span_TimeEvent{ - { - Time: internal.TimeToTimestamp(time.Unix(int64(1472470996238000)/1e6, 1e3*(int64(1472470996238000)%1e6))), - Value: &tracepb.Span_TimeEvent_Annotation_{ - Annotation: &tracepb.Span_TimeEvent_Annotation{ - Description: &tracepb.TruncatableString{Value: "foo"}, - }, - }, - }, - { - Time: internal.TimeToTimestamp(time.Unix(int64(1472470996403000)/1e6, 1e3*(int64(1472470996403000)%1e6))), - Value: &tracepb.Span_TimeEvent_Annotation_{ - Annotation: &tracepb.Span_TimeEvent_Annotation{ - Description: &tracepb.TruncatableString{Value: "bar"}, - }, - }, - }, - }, - }, - Attributes: &tracepb.Span_Attributes{ - AttributeMap: map[string]*tracepb.AttributeValue{ - zipkin.LocalEndpointIPv6: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "7::80:807f"}}}, - zipkin.RemoteEndpointServiceName: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "frontend"}}}, - zipkin.RemoteEndpointIPv4: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "192.168.99.101"}}}, - zipkin.RemoteEndpointPort: {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "9000"}}}, - "http.path": {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "/api"}}}, - "clnt/finagle.version": {Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "6.45.0"}}}, - }, - }, - }, - }, - }, - } - - if diff := cmp.Diff(wantProtoRequests, ereqs, protocmp.Transform()); diff != "" { - t.Errorf("Unexpected difference:\n%v", diff) - } + require.Equal(t, 2, ereqs.SpanCount()) // Now the last phase is to transmit them over the wire and then compare the JSONs @@ -332,14 +223,13 @@ func TestConversionRoundtrip(t *testing.T) { factory := zipkinexporter.NewFactory() config := factory.CreateDefaultConfig().(*zipkinexporter.Config) config.Endpoint = backend.URL - ze, err := factory.CreateTraceExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, config) + params := component.ExporterCreateParams{Logger: zap.NewNop()} + ze, err := factory.CreateTraceExporter(context.Background(), params, config) require.NoError(t, err) require.NotNil(t, ze) require.NoError(t, ze.Start(context.Background(), componenttest.NewNopHost())) - for _, treq := range ereqs { - require.NoError(t, ze.ConsumeTraces(context.Background(), internaldata.OCToTraceData(treq))) - } + require.NoError(t, ze.ConsumeTraces(context.Background(), ereqs)) // Shutdown the exporter so it can flush any remaining data. assert.NoError(t, ze.Shutdown(context.Background())) @@ -351,7 +241,27 @@ func TestConversionRoundtrip(t *testing.T) { accumulatedJSONMsgs := strings.Replace(buf.String(), "][", ",", -1) gj := testutil.GenerateNormalizedJSON(t, accumulatedJSONMsgs) wj := testutil.GenerateNormalizedJSON(t, string(receiverInputJSON)) - assert.Equal(t, wj, gj) + // translation to OTLP sorts spans so do a span-by-span comparison + gj = gj[1 : len(gj)-1] + wj = wj[1 : len(wj)-1] + gjSpans := strings.Split(gj, "{\"annotations\":") + wjSpans := strings.Split(wj, "{\"annotations\":") + assert.Equal(t, len(wjSpans), len(gjSpans)) + for _, wjspan := range wjSpans { + if len(wjspan) > 3 && wjspan[len(wjspan)-1:] == "," { + wjspan = wjspan[0 : len(wjspan)-1] + } + matchFound := false + for _, gjspan := range gjSpans { + if len(gjspan) > 3 && gjspan[len(gjspan)-1:] == "," { + gjspan = gjspan[0 : len(gjspan)-1] + } + if wjspan == gjspan { + matchFound = true + } + } + assert.True(t, matchFound, fmt.Sprintf("no match found for {\"annotations\":%s", wjspan)) + } } func TestStartTraceReception(t *testing.T) { @@ -530,8 +440,7 @@ func TestReceiverConsumerError(t *testing.T) { body, err := ioutil.ReadFile("../../translator/trace/zipkin/testdata/zipkin_v2_single.json") require.NoError(t, err) - r := httptest.NewRequest("POST", "/api/v2/spans", - bytes.NewBuffer(body)) + r := httptest.NewRequest("POST", "/api/v2/spans", bytes.NewBuffer(body)) r.Header.Add("content-type", "application/json") next := &zipkinMockTraceConsumer{ @@ -613,7 +522,7 @@ type zipkinMockTraceConsumer struct { err error } -func (m *zipkinMockTraceConsumer) ConsumeTraces(_ context.Context, td pdata.Traces) error { +func (m *zipkinMockTraceConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) error { m.ch <- td return m.err } diff --git a/testbed/testbed/senders.go b/testbed/testbed/senders.go index a03ff5bc738..1fd2f1a18d9 100644 --- a/testbed/testbed/senders.go +++ b/testbed/testbed/senders.go @@ -404,8 +404,8 @@ func (zs *ZipkinDataSender) Start() error { cfg := factory.CreateDefaultConfig().(*zipkinexporter.Config) cfg.Endpoint = fmt.Sprintf("http://localhost:%d/api/v2/spans", zs.Port) - creationParams := component.ExporterCreateParams{Logger: zap.L()} - exporter, err := factory.CreateTraceExporter(context.Background(), creationParams, cfg) + params := component.ExporterCreateParams{Logger: zap.L()} + exporter, err := factory.CreateTraceExporter(context.Background(), params, cfg) if err != nil { return err } diff --git a/testbed/testbed/validator.go b/testbed/testbed/validator.go index 8170d4f398a..6706c83d05a 100644 --- a/testbed/testbed/validator.go +++ b/testbed/testbed/validator.go @@ -104,8 +104,7 @@ func (v *CorrectnessTestValidator) Validate(tc *TestCase) { } v.assertSentRecdTracingDataEqual(tracesList) } - // TODO enable once identified problems are fixed - //assert.EqualValues(tc.t, 0, len(v.assertionFailures), "There are span data mismatches.") + assert.EqualValues(tc.t, 0, len(v.assertionFailures), "There are span data mismatches.") } func (v *CorrectnessTestValidator) RecordResults(tc *TestCase) { @@ -245,7 +244,7 @@ func (v *CorrectnessTestValidator) diffSpanKind(sentSpan *otlptrace.Span, recdSp } func (v *CorrectnessTestValidator) diffSpanTimestamps(sentSpan *otlptrace.Span, recdSpan *otlptrace.Span) { - if sentSpan.StartTimeUnixNano != recdSpan.StartTimeUnixNano { + if notWithinOneMillisecond(sentSpan.StartTimeUnixNano, recdSpan.StartTimeUnixNano) { af := &AssertionFailure{ typeName: "Span", dataComboName: sentSpan.Name, @@ -255,11 +254,11 @@ func (v *CorrectnessTestValidator) diffSpanTimestamps(sentSpan *otlptrace.Span, } v.assertionFailures = append(v.assertionFailures, af) } - if sentSpan.EndTimeUnixNano != recdSpan.EndTimeUnixNano { + if notWithinOneMillisecond(sentSpan.EndTimeUnixNano, recdSpan.EndTimeUnixNano) { af := &AssertionFailure{ typeName: "Span", dataComboName: sentSpan.Name, - fieldPath: "StartTimeUnixNano", + fieldPath: "EndTimeUnixNano", expectedValue: sentSpan.EndTimeUnixNano, actualValue: recdSpan.EndTimeUnixNano, } @@ -324,7 +323,7 @@ func (v *CorrectnessTestValidator) diffSpanEvents(sentSpan *otlptrace.Span, recd } else { for i, sentEvent := range sentEvents { recdEvent := recdEvents[i] - if sentEvent.TimeUnixNano != recdEvent.TimeUnixNano { + if notWithinOneMillisecond(sentEvent.TimeUnixNano, recdEvent.TimeUnixNano) { af := &AssertionFailure{ typeName: "Span", dataComboName: sentSpan.Name, @@ -428,14 +427,18 @@ func (v *CorrectnessTestValidator) diffAttributesSlice(spanName string, recdAttr sentVal := retrieveAttributeValue(sentAttr) recdVal := retrieveAttributeValue(recdAttr) if !reflect.DeepEqual(sentVal, recdVal) { - af := &AssertionFailure{ - typeName: "Span", - dataComboName: spanName, - fieldPath: fmt.Sprintf(fmtStr, sentAttr.Key), - expectedValue: sentVal, - actualValue: recdVal, + sentStr := fmt.Sprintf("%v", sentVal) + recdStr := fmt.Sprintf("%v", recdVal) + if sentStr != recdStr { + af := &AssertionFailure{ + typeName: "Span", + dataComboName: spanName, + fieldPath: fmt.Sprintf(fmtStr, sentAttr.Key), + expectedValue: sentVal, + actualValue: recdVal, + } + v.assertionFailures = append(v.assertionFailures, af) } - v.assertionFailures = append(v.assertionFailures, af) } } else { af := &AssertionFailure{ @@ -509,3 +512,13 @@ func convertLinksSliceToMap(links []*otlptrace.Span_Link) map[string]*otlptrace. } return eventMap } + +func notWithinOneMillisecond(sentNs uint64, recdNs uint64) bool { + var diff uint64 + if sentNs > recdNs { + diff = sentNs - recdNs + } else { + diff = recdNs - sentNs + } + return diff > uint64(1100000) +} diff --git a/translator/conventions/opentelemetry.go b/translator/conventions/opentelemetry.go index 41daf89fe69..dd2ffb8f1ca 100644 --- a/translator/conventions/opentelemetry.go +++ b/translator/conventions/opentelemetry.go @@ -56,18 +56,59 @@ const ( AttributeHostImageName = "host.image.name" AttributeHostImageID = "host.image.id" AttributeHostImageVersion = "host.image.version" - AttributeProcessID = "process.id" + AttributeProcessID = "process.pid" AttributeProcessExecutableName = "process.executable.name" AttributeProcessExecutablePath = "process.executable.path" AttributeProcessCommand = "process.command" AttributeProcessCommandLine = "process.command_line" - AttributeProcessUsername = "process.username" + AttributeProcessOwner = "process.owner" AttributeCloudProvider = "cloud.provider" AttributeCloudAccount = "cloud.account.id" AttributeCloudRegion = "cloud.region" AttributeCloudZone = "cloud.zone" ) +// GetResourceSemanticConventionAttributeNames a slice with all the Resource Semantic Conventions attribute names. +func GetResourceSemanticConventionAttributeNames() []string { + return []string{ + AttributeServiceName, + AttributeServiceNamespace, + AttributeServiceInstance, + AttributeServiceVersion, + AttributeTelemetrySDKName, + AttributeTelemetrySDKLanguage, + AttributeTelemetrySDKVersion, + AttributeContainerName, + AttributeContainerImage, + AttributeContainerTag, + AttributeFaasName, + AttributeFaasID, + AttributeFaasVersion, + AttributeFaasInstance, + AttributeK8sCluster, + AttributeK8sNamespace, + AttributeK8sPod, + AttributeK8sDeployment, + AttributeHostHostname, + AttributeHostID, + AttributeHostName, + AttributeHostType, + AttributeHostImageName, + AttributeHostImageID, + AttributeHostImageVersion, + AttributeProcessID, + AttributeProcessExecutableName, + AttributeProcessExecutablePath, + AttributeProcessCommand, + AttributeProcessCommandLine, + AttributeProcessOwner, + AttributeCloudProvider, + AttributeCloudAccount, + AttributeCloudRegion, + AttributeCloudZone, + } +} + // OpenTelemetry Semantic Convention values for general Span attribute names. // See: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/span-general.md const ( diff --git a/translator/trace/protospan_translation.go b/translator/trace/protospan_translation.go index 116bab83f18..7394e3486e6 100644 --- a/translator/trace/protospan_translation.go +++ b/translator/trace/protospan_translation.go @@ -14,6 +14,14 @@ package tracetranslator +import ( + "fmt" + "strconv" + "strings" + + "go.opentelemetry.io/collector/consumer/pdata" +) + // Some of the keys used to represent OTLP constructs as tags or annotations in other formats. const ( AnnotationDescriptionKey = "description" @@ -36,14 +44,18 @@ const ( TagZipkinCensusMsg = "census.status_description" TagZipkinOpenCensusMsg = "opencensus.status_description" - TagW3CTraceState = "w3c.tracestate" + TagW3CTraceState = "w3c.tracestate" + TagServiceNameSource = "otlp.service.name.source" + TagInstrumentationName = "otlp.instrumentation.library.name" + TagInstrumentationVersion = "otlp.instrumentation.library.version" ) // Constants used for signifying batch-level attribute values where not supplied by OTLP data but required // by other protocols. const ( - ResourceNotSet = "OTLPResourceNotSet" - ResourceNoAttrs = "OTLPResourceNoAttributes" + ResourceNotSet = "OTLPResourceNotSet" + ResourceNoAttrs = "OTLPResourceNoAttributes" + ResourceNoServiceName = "OTLPResourceNoServiceName" ) // OpenTracingSpanKind are possible values for TagSpanKind and match the OpenTracing @@ -60,3 +72,55 @@ const ( OpenTracingSpanKindProducer OpenTracingSpanKind = "producer" OpenTracingSpanKindInternal OpenTracingSpanKind = "internal" ) + +const ( + SpanLinkDataFormat = "%s|%s|%s|%s|%d" + SpanEventDataFormat = "%s|%s|%d" +) + +// AttributeValueToString converts an OTLP AttributeValue object to its equivalent string representation +func AttributeValueToString(attr pdata.AttributeValue, jsonLike bool) string { + switch attr.Type() { + case pdata.AttributeValueNULL: + if jsonLike { + return "null" + } + return "" + case pdata.AttributeValueSTRING: + if jsonLike { + return fmt.Sprintf("%q", attr.StringVal()) + } + return attr.StringVal() + + case pdata.AttributeValueBOOL: + return strconv.FormatBool(attr.BoolVal()) + + case pdata.AttributeValueDOUBLE: + return strconv.FormatFloat(attr.DoubleVal(), 'f', -1, 64) + + case pdata.AttributeValueINT: + return strconv.FormatInt(attr.IntVal(), 10) + + case pdata.AttributeValueMAP: + // OpenCensus attributes cannot represent maps natively. Convert the + // map to a JSON-like string. + var sb strings.Builder + sb.WriteString("{") + m := attr.MapVal() + first := true + m.ForEach(func(k string, v pdata.AttributeValue) { + if !first { + sb.WriteString(",") + } + first = false + sb.WriteString(fmt.Sprintf("%q:%s", k, AttributeValueToString(v, true))) + }) + sb.WriteString("}") + return sb.String() + + default: + return fmt.Sprintf("", attr.Type()) + } + + // TODO: Add support for ARRAY type. +} diff --git a/translator/trace/zipkin/testdata/zipkin_v1_error_batch.json b/translator/trace/zipkin/testdata/zipkin_v1_error_batch.json new file mode 100644 index 00000000000..372289ee72a --- /dev/null +++ b/translator/trace/zipkin/testdata/zipkin_v1_error_batch.json @@ -0,0 +1,64 @@ +[ + { + "traceId": "0ed2e63cbe71f5a8", + "name": "checkAvailability", + "id": "0ed2e63cbe71f5a8", + "annotations": [ + { + "timestamp": 1544805927448081, + "value": "sr", + "endpoint": { + "ipv4": "172.31.0.4", + "port": 0, + "serviceName": "service1" + } + }, + { + "timestamp": 1544805927450000, + "value": "custom time event", + "endpoint": { + "ipv4": "172.31.0.4", + "port": 0, + "serviceName": "service1" + } + }, + { + "timestamp": 1544805927460102, + "value": "ss", + "endpoint": { + "ipv4": "172.31.0.4", + "port": 0, + "serviceName": "service1" + } + } + ] + }, + { + "traceId": "0ed2e63cbe71f5a8", + "name": "checkStock", + "id": "f9ebb6e64880612a", + "parentId": "BADID", + "timestamp": 1544805927453923, + "duration": 3740, + "annotations": [ + { + "timestamp": 1544805927453923, + "value": "cs", + "endpoint": { + "ipv4": "172.31.0.4", + "port": 0, + "serviceName": "service1" + } + }, + { + "timestamp": 1544805927457717, + "value": "cr", + "endpoint": { + "ipv4": "172.31.0.4", + "port": 0, + "serviceName": "service1" + } + } + ] + } +] diff --git a/translator/trace/zipkin/traces_to_zipkinv2.go b/translator/trace/zipkin/traces_to_zipkinv2.go new file mode 100644 index 00000000000..5bababeacd4 --- /dev/null +++ b/translator/trace/zipkin/traces_to_zipkinv2.go @@ -0,0 +1,407 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin + +import ( + "encoding/json" + "fmt" + "net" + "strconv" + "time" + + zipkinmodel "github.com/openzipkin/zipkin-go/model" + + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal" + "go.opentelemetry.io/collector/translator/conventions" + tracetranslator "go.opentelemetry.io/collector/translator/trace" +) + +var sampled = true + +// InternalTracesToZipkinSpans translates internal trace data into Zipkin v2 spans. +// Returns a slice of Zipkin SpanModel's. +func InternalTracesToZipkinSpans(td pdata.Traces) ([]*zipkinmodel.SpanModel, error) { + + resourceSpans := td.ResourceSpans() + if resourceSpans.Len() == 0 { + return nil, nil + } + + zSpans := make([]*zipkinmodel.SpanModel, 0, td.SpanCount()) + + for i := 0; i < resourceSpans.Len(); i++ { + rs := resourceSpans.At(i) + if rs.IsNil() { + continue + } + + batch, err := resourceSpansToZipkinSpans(rs, td.SpanCount()/resourceSpans.Len()) + if err != nil { + return zSpans, err + } + if batch != nil { + zSpans = append(zSpans, batch...) + } + } + + return zSpans, nil +} + +func resourceSpansToZipkinSpans(rs pdata.ResourceSpans, estSpanCount int) ([]*zipkinmodel.SpanModel, error) { + resource := rs.Resource() + ilss := rs.InstrumentationLibrarySpans() + + if resource.IsNil() && ilss.Len() == 0 { + return nil, nil + } + + localServiceName, zTags := resourceToZipkinEndpointServiceNameAndAttributeMap(resource) + + zSpans := make([]*zipkinmodel.SpanModel, 0, estSpanCount) + for i := 0; i < ilss.Len(); i++ { + ils := ilss.At(i) + if ils.IsNil() { + continue + } + extractInstrumentationLibraryTags(ils.InstrumentationLibrary(), zTags) + spans := ils.Spans() + for j := 0; j < spans.Len(); j++ { + span, err := spanToZipkinSpan(spans.At(j), localServiceName, zTags) + if err != nil { + return zSpans, err + } + zSpans = append(zSpans, span) + } + } + + return zSpans, nil +} + +func extractInstrumentationLibraryTags(il pdata.InstrumentationLibrary, zTags map[string]string) { + if il.IsNil() { + return + } + if ilName := il.Name(); ilName != "" { + zTags[tracetranslator.TagInstrumentationName] = ilName + } + if ilVer := il.Version(); ilVer != "" { + zTags[tracetranslator.TagInstrumentationVersion] = ilVer + } +} + +func spanToZipkinSpan( + span pdata.Span, + localServiceName string, + zTags map[string]string, +) (*zipkinmodel.SpanModel, error) { + + tags := aggregateSpanTags(span, zTags) + + zs := &zipkinmodel.SpanModel{} + + hi, lo, err := tracetranslator.BytesToUInt64TraceID(span.TraceID().Bytes()) + if err != nil { + return nil, err + } + zs.TraceID = zipkinmodel.TraceID{ + High: hi, + Low: lo, + } + + idVal, err := tracetranslator.BytesToUInt64SpanID(span.SpanID().Bytes()) + if err != nil { + return nil, err + } + zs.ID = zipkinmodel.ID(idVal) + + if len(span.TraceState()) > 0 { + tags[tracetranslator.TagW3CTraceState] = string(span.TraceState()) + } + + if len(span.ParentSpanID().Bytes()) > 0 { + idVal, err := tracetranslator.BytesToUInt64SpanID(span.ParentSpanID().Bytes()) + if err != nil { + return nil, err + } + id := zipkinmodel.ID(idVal) + zs.ParentID = &id + } + + zs.Sampled = &sampled + zs.Name = span.Name() + zs.Timestamp = internal.UnixNanoToTime(span.StartTime()) + if span.EndTime() != 0 { + zs.Duration = time.Duration(span.EndTime() - span.StartTime()) + } + zs.Kind = spanKindToZipkinKind(span.Kind()) + if span.Kind() == pdata.SpanKindINTERNAL { + tags[tracetranslator.TagSpanKind] = "internal" + } + + redundantKeys := make(map[string]bool, 8) + zs.LocalEndpoint = zipkinEndpointFromTags(tags, localServiceName, false, redundantKeys) + zs.RemoteEndpoint = zipkinEndpointFromTags(tags, "", true, redundantKeys) + + removeRedundentTags(redundantKeys, tags) + + status := span.Status() + if !status.IsNil() { + tags[tracetranslator.TagStatusCode] = status.Code().String() + if status.Message() != "" { + tags[tracetranslator.TagStatusMsg] = status.Message() + if int32(status.Code()) > 0 { + zs.Err = fmt.Errorf("%s", status.Message()) + } + } + } + + if err := spanEventsToZipkinAnnotations(span.Events(), zs); err != nil { + return nil, err + } + if err := spanLinksToZipkinTags(span.Links(), tags); err != nil { + return nil, err + } + + zs.Tags = tags + + return zs, nil +} + +func aggregateSpanTags(span pdata.Span, zTags map[string]string) map[string]string { + tags := make(map[string]string) + for key, val := range zTags { + tags[key] = val + } + spanTags := attributeMapToStringMap(span.Attributes()) + for key, val := range spanTags { + tags[key] = val + } + return tags +} + +func spanEventsToZipkinAnnotations(events pdata.SpanEventSlice, zs *zipkinmodel.SpanModel) error { + if events.Len() > 0 { + zAnnos := make([]zipkinmodel.Annotation, events.Len()) + for i := 0; i < events.Len(); i++ { + event := events.At(i) + if event.IsNil() { + continue + } + if event.Attributes().Len() == 0 && event.DroppedAttributesCount() == 0 { + zAnnos[i] = zipkinmodel.Annotation{ + Timestamp: internal.UnixNanoToTime(event.Timestamp()), + Value: event.Name(), + } + } else { + rawMap := attributeMapToMap(event.Attributes()) + jsonStr, err := json.Marshal(rawMap) + if err != nil { + return err + } + zAnnos[i] = zipkinmodel.Annotation{ + Timestamp: internal.UnixNanoToTime(event.Timestamp()), + Value: fmt.Sprintf(tracetranslator.SpanEventDataFormat, event.Name(), jsonStr, + event.DroppedAttributesCount()), + } + } + } + zs.Annotations = zAnnos + } + return nil +} + +func spanLinksToZipkinTags(links pdata.SpanLinkSlice, zTags map[string]string) error { + for i := 0; i < links.Len(); i++ { + link := links.At(i) + if !link.IsNil() { + key := fmt.Sprintf("otlp.link.%d", i) + rawMap := attributeMapToMap(link.Attributes()) + jsonStr, err := json.Marshal(rawMap) + if err != nil { + return err + } + zTags[key] = fmt.Sprintf(tracetranslator.SpanLinkDataFormat, link.TraceID().String(), + link.SpanID().String(), link.TraceState(), jsonStr, link.DroppedAttributesCount()) + } + } + return nil +} + +func attributeMapToMap(attrMap pdata.AttributeMap) map[string]interface{} { + rawMap := make(map[string]interface{}) + attrMap.ForEach(func(k string, v pdata.AttributeValue) { + switch v.Type() { + case pdata.AttributeValueSTRING: + rawMap[k] = v.StringVal() + case pdata.AttributeValueINT: + rawMap[k] = v.IntVal() + case pdata.AttributeValueDOUBLE: + rawMap[k] = v.DoubleVal() + case pdata.AttributeValueBOOL: + rawMap[k] = v.BoolVal() + case pdata.AttributeValueNULL: + rawMap[k] = nil + } + }) + return rawMap +} + +func attributeMapToStringMap(attrMap pdata.AttributeMap) map[string]string { + rawMap := make(map[string]string) + attrMap.ForEach(func(k string, v pdata.AttributeValue) { + switch v.Type() { + case pdata.AttributeValueSTRING: + rawMap[k] = v.StringVal() + case pdata.AttributeValueINT: + rawMap[k] = strconv.FormatInt(v.IntVal(), 10) + case pdata.AttributeValueDOUBLE: + rawMap[k] = strconv.FormatFloat(v.DoubleVal(), 'f', -1, 64) + case pdata.AttributeValueBOOL: + rawMap[k] = strconv.FormatBool(v.BoolVal()) + case pdata.AttributeValueNULL: + rawMap[k] = "" + } + }) + return rawMap +} + +func removeRedundentTags(redundantKeys map[string]bool, zTags map[string]string) { + for k, v := range redundantKeys { + if v { + delete(zTags, k) + } + } +} + +func resourceToZipkinEndpointServiceNameAndAttributeMap( + resource pdata.Resource, +) (serviceName string, zTags map[string]string) { + + zTags = make(map[string]string) + if resource.IsNil() { + return tracetranslator.ResourceNotSet, zTags + } + + attrs := resource.Attributes() + if attrs.Len() == 0 { + return tracetranslator.ResourceNoAttrs, zTags + } + + attrs.ForEach(func(k string, v pdata.AttributeValue) { + zTags[k] = tracetranslator.AttributeValueToString(v, false) + }) + + serviceName = extractZipkinServiceName(zTags) + return serviceName, zTags +} + +func extractZipkinServiceName(zTags map[string]string) string { + var serviceName string + if sn, ok := zTags[conventions.AttributeServiceName]; ok { + serviceName = sn + delete(zTags, conventions.AttributeServiceName) + } else if fn, ok := zTags[conventions.AttributeFaasName]; ok { + serviceName = fn + delete(zTags, conventions.AttributeFaasName) + zTags[tracetranslator.TagServiceNameSource] = conventions.AttributeFaasName + } else if fn, ok := zTags[conventions.AttributeK8sDeployment]; ok { + serviceName = fn + delete(zTags, conventions.AttributeK8sDeployment) + zTags[tracetranslator.TagServiceNameSource] = conventions.AttributeK8sDeployment + } else if fn, ok := zTags[conventions.AttributeProcessExecutableName]; ok { + serviceName = fn + delete(zTags, conventions.AttributeProcessExecutableName) + zTags[tracetranslator.TagServiceNameSource] = conventions.AttributeProcessExecutableName + } else { + serviceName = tracetranslator.ResourceNoServiceName + } + return serviceName +} + +func spanKindToZipkinKind(kind pdata.SpanKind) zipkinmodel.Kind { + switch kind { + case pdata.SpanKindCLIENT: + return zipkinmodel.Client + case pdata.SpanKindSERVER: + return zipkinmodel.Server + case pdata.SpanKindPRODUCER: + return zipkinmodel.Producer + case pdata.SpanKindCONSUMER: + return zipkinmodel.Consumer + default: + return zipkinmodel.Undetermined + } +} + +func zipkinEndpointFromTags( + zTags map[string]string, + localServiceName string, + remoteEndpoint bool, + redundantKeys map[string]bool, +) (endpoint *zipkinmodel.Endpoint) { + + serviceName := localServiceName + if peerSvc, ok := zTags[conventions.AttributePeerService]; ok && remoteEndpoint { + serviceName = peerSvc + redundantKeys[conventions.AttributePeerService] = true + } + + var ipKey, portKey string + if remoteEndpoint { + ipKey, portKey = conventions.AttributeNetPeerIP, conventions.AttributeNetPeerPort + } else { + ipKey, portKey = conventions.AttributeNetHostIP, conventions.AttributeNetHostPort + } + + var ip net.IP + ipv6Selected := false + if ipStr, ok := zTags[ipKey]; ok { + ipv6Selected = isIPv6Address(ipStr) + ip = net.ParseIP(ipStr) + redundantKeys[ipKey] = true + } + + var port uint64 + if portStr, ok := zTags[portKey]; ok { + port, _ = strconv.ParseUint(portStr, 10, 16) + redundantKeys[portKey] = true + } + + if serviceName == "" && ip == nil { + return nil + } + + zEndpoint := &zipkinmodel.Endpoint{ + ServiceName: serviceName, + Port: uint16(port), + } + if ipv6Selected { + zEndpoint.IPv6 = ip + } else { + zEndpoint.IPv4 = ip + } + + return zEndpoint +} + +func isIPv6Address(ipStr string) bool { + for i := 0; i < len(ipStr); i++ { + if ipStr[i] == ':' { + return true + } + } + return false +} diff --git a/translator/trace/zipkin/traces_to_zipkinv2_test.go b/translator/trace/zipkin/traces_to_zipkinv2_test.go new file mode 100644 index 00000000000..32c4a562756 --- /dev/null +++ b/translator/trace/zipkin/traces_to_zipkinv2_test.go @@ -0,0 +1,113 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin + +import ( + "errors" + "io" + "math/rand" + "testing" + + zipkinmodel "github.com/openzipkin/zipkin-go/model" + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/consumer/pdata" + otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/trace/v1" + "go.opentelemetry.io/collector/internal/data/testdata" + "go.opentelemetry.io/collector/internal/goldendataset" +) + +func TestInternalTracesToZipkinSpans(t *testing.T) { + tests := []struct { + name string + td pdata.Traces + zs []*zipkinmodel.SpanModel + err error + }{ + { + name: "empty", + td: testdata.GenerateTraceDataEmpty(), + err: nil, + }, + { + name: "oneEmpty", + td: testdata.GenerateTraceDataOneEmptyResourceSpans(), + zs: make([]*zipkinmodel.SpanModel, 0), + err: nil, + }, + { + name: "oneEmptyOneNil", + td: testdata.GenerateTraceDataOneEmptyOneNilResourceSpans(), + zs: make([]*zipkinmodel.SpanModel, 0), + err: nil, + }, + { + name: "noLibs", + td: testdata.GenerateTraceDataNoLibraries(), + zs: make([]*zipkinmodel.SpanModel, 0), + err: nil, + }, + { + name: "oneEmptyLib", + td: testdata.GenerateTraceDataOneEmptyInstrumentationLibrary(), + zs: make([]*zipkinmodel.SpanModel, 0), + err: nil, + }, + { + name: "oneEmptyLibOneNilLib", + td: testdata.GenerateTraceDataOneEmptyOneNilInstrumentationLibrary(), + zs: make([]*zipkinmodel.SpanModel, 0), + err: nil, + }, + { + name: "oneSpanNoResrouce", + td: testdata.GenerateTraceDataOneSpanNoResource(), + zs: make([]*zipkinmodel.SpanModel, 0), + err: errors.New("TraceID is nil"), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + zss, err := InternalTracesToZipkinSpans(test.td) + assert.EqualValues(t, test.err, err) + if test.name == "empty" { + assert.Nil(t, zss) + } else { + assert.Equal(t, len(test.zs), len(zss)) + assert.EqualValues(t, test.zs, zss) + } + }) + } +} + +func TestInternalTracesToZipkinSpansAndBack(t *testing.T) { + rscSpans, err := goldendataset.GenerateResourceSpans( + "../../../internal/goldendataset/testdata/generated_pict_pairs_traces.txt", + "../../../internal/goldendataset/testdata/generated_pict_pairs_spans.txt", + io.Reader(rand.New(rand.NewSource(2004)))) + assert.NoError(t, err) + for _, rs := range rscSpans { + orig := make([]*otlptrace.ResourceSpans, 1) + orig[0] = rs + td := pdata.TracesFromOtlp(orig) + zipkinSpans, err := InternalTracesToZipkinSpans(td) + assert.NoError(t, err) + assert.Equal(t, td.SpanCount(), len(zipkinSpans)) + tdFromZS, zErr := V2SpansToInternalTraces(zipkinSpans) + assert.NoError(t, zErr) + assert.NotNil(t, tdFromZS) + assert.Equal(t, td.SpanCount(), tdFromZS.SpanCount()) + } +} diff --git a/translator/trace/zipkin/zipkinv1_thrift_to_traces.go b/translator/trace/zipkin/zipkinv1_thrift_to_traces.go new file mode 100644 index 00000000000..65399c2b2e0 --- /dev/null +++ b/translator/trace/zipkin/zipkinv1_thrift_to_traces.go @@ -0,0 +1,34 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin + +import ( + "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" + + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/translator/internaldata" +) + +func V1ThriftBatchToInternalTraces(zSpans []*zipkincore.Span) (pdata.Traces, error) { + traces := pdata.NewTraces() + + ocTraces, _ := V1ThriftBatchToOCProto(zSpans) + + for _, td := range ocTraces { + tmp := internaldata.OCToTraceData(td) + tmp.ResourceSpans().MoveAndAppendTo(traces.ResourceSpans()) + } + return traces, nil +} diff --git a/translator/trace/zipkin/zipkinv1_thrift_to_traces_test.go b/translator/trace/zipkin/zipkinv1_thrift_to_traces_test.go new file mode 100644 index 00000000000..e353c0d5a9c --- /dev/null +++ b/translator/trace/zipkin/zipkinv1_thrift_to_traces_test.go @@ -0,0 +1,39 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin + +import ( + "encoding/json" + "io/ioutil" + "testing" + + "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestV1ThriftToTraces(t *testing.T) { + blob, err := ioutil.ReadFile("./testdata/zipkin_v1_thrift_single_batch.json") + require.NoError(t, err, "Failed to load test data") + + var ztSpans []*zipkincore.Span + err = json.Unmarshal(blob, &ztSpans) + require.NoError(t, err, "Failed to unmarshal json into zipkin v1 thrift") + + got, err := V1ThriftBatchToInternalTraces(ztSpans) + require.NoError(t, err, "Failed to translate zipkinv1 thrift to OC proto") + + assert.Equal(t, 5, got.SpanCount()) +} diff --git a/translator/trace/zipkin/zipkinv1_to_protospan.go b/translator/trace/zipkin/zipkinv1_to_protospan.go index b65e576984d..0aae16f2276 100644 --- a/translator/trace/zipkin/zipkinv1_to_protospan.go +++ b/translator/trace/zipkin/zipkinv1_to_protospan.go @@ -28,6 +28,7 @@ import ( "github.com/pkg/errors" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal" tracetranslator "go.opentelemetry.io/collector/translator/trace" ) @@ -253,12 +254,17 @@ func zipkinV1BinAnnotationsToOCAttributes(binAnnotations []*binaryAnnotation) (a func parseAnnotationValue(value string) *tracepb.AttributeValue { pbAttrib := &tracepb.AttributeValue{} - if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { + switch determineValueType(value) { + case pdata.AttributeValueINT: + iValue, _ := strconv.ParseInt(value, 10, 64) pbAttrib.Value = &tracepb.AttributeValue_IntValue{IntValue: iValue} - } else if bValue, err := strconv.ParseBool(value); err == nil { + case pdata.AttributeValueDOUBLE: + fValue, _ := strconv.ParseFloat(value, 64) + pbAttrib.Value = &tracepb.AttributeValue_DoubleValue{DoubleValue: fValue} + case pdata.AttributeValueBOOL: + bValue, _ := strconv.ParseBool(value) pbAttrib.Value = &tracepb.AttributeValue_BoolValue{BoolValue: bValue} - } else { - // For now all else go to string + default: pbAttrib.Value = &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: value}} } diff --git a/translator/trace/zipkin/zipkinv1_to_traces.go b/translator/trace/zipkin/zipkinv1_to_traces.go new file mode 100644 index 00000000000..c37b831f7e4 --- /dev/null +++ b/translator/trace/zipkin/zipkinv1_to_traces.go @@ -0,0 +1,35 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin + +import ( + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/translator/internaldata" +) + +func V1JSONBatchToInternalTraces(blob []byte) (pdata.Traces, error) { + traces := pdata.NewTraces() + + ocTraces, err := V1JSONBatchToOCProto(blob) + if err != nil { + return traces, err + } + + for _, td := range ocTraces { + tmp := internaldata.OCToTraceData(td) + tmp.ResourceSpans().MoveAndAppendTo(traces.ResourceSpans()) + } + return traces, nil +} diff --git a/translator/trace/zipkin/zipkinv1_to_traces_test.go b/translator/trace/zipkin/zipkinv1_to_traces_test.go new file mode 100644 index 00000000000..e9ae35aea97 --- /dev/null +++ b/translator/trace/zipkin/zipkinv1_to_traces_test.go @@ -0,0 +1,42 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin + +import ( + "io/ioutil" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSingleJSONV1BatchToTraces(t *testing.T) { + blob, err := ioutil.ReadFile("./testdata/zipkin_v1_single_batch.json") + require.NoError(t, err, "Failed to load test data") + + got, err := V1JSONBatchToInternalTraces(blob) + require.NoError(t, err, "Failed to translate zipkinv1 to OC proto") + + assert.Equal(t, 5, got.SpanCount()) +} + +func TestErrorSpanToTraces(t *testing.T) { + blob, err := ioutil.ReadFile("./testdata/zipkin_v1_error_batch.json") + require.NoError(t, err, "Failed to load test data") + + td, err := V1JSONBatchToInternalTraces(blob) + assert.Error(t, err, "Should have generated error") + assert.NotNil(t, td) +} diff --git a/translator/trace/zipkin/zipkinv2_to_protospan.go b/translator/trace/zipkin/zipkinv2_to_protospan.go deleted file mode 100644 index 1aee6f98c9a..00000000000 --- a/translator/trace/zipkin/zipkinv2_to_protospan.go +++ /dev/null @@ -1,266 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package zipkin - -import ( - "net" - "strconv" - "strings" - - commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" - tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" - zipkinmodel "github.com/openzipkin/zipkin-go/model" - - "go.opentelemetry.io/collector/consumer/consumerdata" - "go.opentelemetry.io/collector/internal" - tracetranslator "go.opentelemetry.io/collector/translator/trace" -) - -func V2BatchToOCProto(zipkinSpans []*zipkinmodel.SpanModel) (reqs []consumerdata.TraceData, err error) { - // *commonpb.Node instances have unique addresses hence - // for grouping within a map, we'll use the .String() value - byNodeGrouping := make(map[string][]*tracepb.Span) - uniqueNodes := make([]*commonpb.Node, 0, len(zipkinSpans)) - // Now translate them into tracepb.Span - for _, zspan := range zipkinSpans { - if zspan == nil { - continue - } - span, node := zipkinSpanToTraceSpan(zspan) - key := node.String() - if _, alreadyAdded := byNodeGrouping[key]; !alreadyAdded { - uniqueNodes = append(uniqueNodes, node) - } - byNodeGrouping[key] = append(byNodeGrouping[key], span) - } - - for _, node := range uniqueNodes { - key := node.String() - spans := byNodeGrouping[key] - if len(spans) == 0 { - // Should never happen but nonetheless be cautious - // not to send blank spans. - continue - } - reqs = append(reqs, consumerdata.TraceData{ - Node: node, - Spans: spans, - }) - delete(byNodeGrouping, key) - } - - return reqs, nil -} - -func zipkinSpanToTraceSpan(zs *zipkinmodel.SpanModel) (*tracepb.Span, *commonpb.Node) { - traceID := tracetranslator.UInt64ToByteTraceID(zs.TraceID.High, zs.TraceID.Low) - var parentSpanID []byte - if zs.ParentID != nil { - parentSpanID = tracetranslator.UInt64ToByteSpanID(uint64(*zs.ParentID)) - } - - attributes, ocStatus := zipkinTagsToTraceAttributes(zs.Tags, zs.Kind) - - pbs := &tracepb.Span{ - TraceId: traceID, - SpanId: tracetranslator.UInt64ToByteSpanID(uint64(zs.ID)), - ParentSpanId: parentSpanID, - Name: &tracepb.TruncatableString{Value: zs.Name}, - StartTime: internal.TimeToTimestamp(zs.Timestamp), - EndTime: internal.TimeToTimestamp(zs.Timestamp.Add(zs.Duration)), - Kind: zipkinSpanKindToProtoSpanKind(zs.Kind), - Status: ocStatus, - Attributes: attributes, - TimeEvents: zipkinAnnotationsToProtoTimeEvents(zs.Annotations), - } - - node := nodeFromZipkinEndpoints(zs, pbs) - setTimestampsIfUnset(pbs) - - return pbs, node -} - -func nodeFromZipkinEndpoints(zs *zipkinmodel.SpanModel, pbs *tracepb.Span) *commonpb.Node { - if zs.LocalEndpoint == nil && zs.RemoteEndpoint == nil { - return nil - } - - node := new(commonpb.Node) - var endpointMap map[string]string - - // Retrieve and make use of the local endpoint - if lep := zs.LocalEndpoint; lep != nil { - node.ServiceInfo = &commonpb.ServiceInfo{ - Name: lep.ServiceName, - } - endpointMap = zipkinEndpointIntoAttributes(lep, endpointMap, isLocalEndpoint) - } - - // Retrieve and make use of the remote endpoint - if rep := zs.RemoteEndpoint; rep != nil { - endpointMap = zipkinEndpointIntoAttributes(rep, endpointMap, isRemoteEndpoint) - } - - if endpointMap != nil { - if pbs.Attributes == nil { - pbs.Attributes = &tracepb.Span_Attributes{} - } - if pbs.Attributes.AttributeMap == nil { - pbs.Attributes.AttributeMap = make( - map[string]*tracepb.AttributeValue, len(endpointMap)) - } - - // Delete the redundant serviceName key since it is already on the node. - delete(endpointMap, LocalEndpointServiceName) - attrbMap := pbs.Attributes.AttributeMap - for key, value := range endpointMap { - attrbMap[key] = &tracepb.AttributeValue{ - Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{Value: value}, - }, - } - } - } - - return node -} - -var blankIP net.IP - -// zipkinEndpointIntoAttributes extracts information from s zipkin endpoint struct -// and puts it into a map with pre-defined keys. -func zipkinEndpointIntoAttributes( - ep *zipkinmodel.Endpoint, - into map[string]string, - endpointType zipkinDirection, -) map[string]string { - - if into == nil { - into = make(map[string]string) - } - - var ipv4Key, ipv6Key, portKey, serviceNameKey string - if endpointType == isLocalEndpoint { - ipv4Key, ipv6Key = LocalEndpointIPv4, LocalEndpointIPv6 - portKey, serviceNameKey = LocalEndpointPort, LocalEndpointServiceName - } else { - ipv4Key, ipv6Key = RemoteEndpointIPv4, RemoteEndpointIPv6 - portKey, serviceNameKey = RemoteEndpointPort, RemoteEndpointServiceName - } - if ep.IPv4 != nil && !ep.IPv4.Equal(blankIP) { - into[ipv4Key] = ep.IPv4.String() - } - if ep.IPv6 != nil && !ep.IPv6.Equal(blankIP) { - into[ipv6Key] = ep.IPv6.String() - } - if ep.Port > 0 { - into[portKey] = strconv.Itoa(int(ep.Port)) - } - if serviceName := ep.ServiceName; serviceName != "" { - into[serviceNameKey] = serviceName - } - return into -} - -func zipkinSpanKindToProtoSpanKind(skind zipkinmodel.Kind) tracepb.Span_SpanKind { - switch strings.ToUpper(string(skind)) { - case "CLIENT": - return tracepb.Span_CLIENT - case "SERVER": - return tracepb.Span_SERVER - default: - return tracepb.Span_SPAN_KIND_UNSPECIFIED - } -} - -func zipkinAnnotationsToProtoTimeEvents(zas []zipkinmodel.Annotation) *tracepb.Span_TimeEvents { - if len(zas) == 0 { - return nil - } - tevs := make([]*tracepb.Span_TimeEvent, 0, len(zas)) - for _, za := range zas { - if tev := zipkinAnnotationToProtoAnnotation(za); tev != nil { - tevs = append(tevs, tev) - } - } - if len(tevs) == 0 { - return nil - } - return &tracepb.Span_TimeEvents{ - TimeEvent: tevs, - } -} - -var blankAnnotation zipkinmodel.Annotation - -func zipkinAnnotationToProtoAnnotation(zas zipkinmodel.Annotation) *tracepb.Span_TimeEvent { - if zas == blankAnnotation { - return nil - } - return &tracepb.Span_TimeEvent{ - Time: internal.TimeToTimestamp(zas.Timestamp), - Value: &tracepb.Span_TimeEvent_Annotation_{ - Annotation: &tracepb.Span_TimeEvent_Annotation{ - Description: &tracepb.TruncatableString{Value: zas.Value}, - }, - }, - } -} - -func zipkinTagsToTraceAttributes(tags map[string]string, skind zipkinmodel.Kind) (*tracepb.Span_Attributes, *tracepb.Status) { - // Produce and Consumer span kinds are not representable in OpenCensus format. - // We will represent them using TagSpanKind attribute, according to OpenTracing - // conventions. Check if it is one of those span kinds. - var spanKindTagVal tracetranslator.OpenTracingSpanKind - switch skind { - case zipkinmodel.Producer: - spanKindTagVal = tracetranslator.OpenTracingSpanKindProducer - case zipkinmodel.Consumer: - spanKindTagVal = tracetranslator.OpenTracingSpanKindConsumer - } - - if len(tags) == 0 && spanKindTagVal == "" { - // No input tags and no need to add a span kind tag. Keep attributes map empty. - return nil, nil - } - - sMapper := &statusMapper{} - amap := make(map[string]*tracepb.AttributeValue, len(tags)) - for key, value := range tags { - - pbAttrib := parseAnnotationValue(value) - - if drop := sMapper.fromAttribute(key, pbAttrib); drop { - continue - } - - amap[key] = pbAttrib - } - - status := sMapper.ocStatus() - - if spanKindTagVal != "" { - // Set the previously translated span kind attribute (see top of this function). - // We do this after the "tags" map is translated so that we will overwrite - // the attribute if it exists. - amap[tracetranslator.TagSpanKind] = &tracepb.AttributeValue{ - Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{Value: string(spanKindTagVal)}, - }, - } - } - - return &tracepb.Span_Attributes{AttributeMap: amap}, status -} diff --git a/translator/trace/zipkin/zipkinv2_to_protospan_test.go b/translator/trace/zipkin/zipkinv2_to_protospan_test.go deleted file mode 100644 index 3bfb9900acc..00000000000 --- a/translator/trace/zipkin/zipkinv2_to_protospan_test.go +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package zipkin - -import ( - "encoding/json" - "io/ioutil" - "net" - "testing" - "time" - - tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" - zipkinmodel "github.com/openzipkin/zipkin-go/model" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - tracetranslator "go.opentelemetry.io/collector/translator/trace" -) - -func TestShortIDSpanConversion(t *testing.T) { - shortID, _ := zipkinmodel.TraceIDFromHex("0102030405060708") - assert.Equal(t, uint64(0), shortID.High, "wanted 64bit traceID, so TraceID.High must be zero") - - zc := zipkinmodel.SpanContext{ - TraceID: shortID, - ID: zipkinmodel.ID(shortID.Low), - } - zs := zipkinmodel.SpanModel{ - SpanContext: zc, - } - - ocSpan, _ := zipkinSpanToTraceSpan(&zs) - require.Len(t, ocSpan.TraceId, 16, "incorrect OC proto trace id length") - - want := []byte{0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8} - assert.Equal(t, want, ocSpan.TraceId) -} - -func TestV2SpanKindTranslation(t *testing.T) { - tests := []struct { - zipkinKind zipkinmodel.Kind - ocKind tracepb.Span_SpanKind - otKind tracetranslator.OpenTracingSpanKind - }{ - { - zipkinKind: zipkinmodel.Client, - ocKind: tracepb.Span_CLIENT, - otKind: "", - }, - { - zipkinKind: zipkinmodel.Server, - ocKind: tracepb.Span_SERVER, - otKind: "", - }, - { - zipkinKind: zipkinmodel.Producer, - ocKind: tracepb.Span_SPAN_KIND_UNSPECIFIED, - otKind: tracetranslator.OpenTracingSpanKindProducer, - }, - { - zipkinKind: zipkinmodel.Consumer, - ocKind: tracepb.Span_SPAN_KIND_UNSPECIFIED, - otKind: tracetranslator.OpenTracingSpanKindConsumer, - }, - } - - for _, tt := range tests { - t.Run(string(tt.zipkinKind), func(t *testing.T) { - zs := &zipkinmodel.SpanModel{ - SpanContext: zipkinmodel.SpanContext{ - TraceID: zipkinmodel.TraceID{Low: 123}, - ID: 456, - }, - Kind: tt.zipkinKind, - Timestamp: time.Now(), - } - ocSpan, _ := zipkinSpanToTraceSpan(zs) - assert.EqualValues(t, tt.ocKind, ocSpan.Kind) - if tt.otKind != "" { - otSpanKind := ocSpan.Attributes.AttributeMap[tracetranslator.TagSpanKind] - assert.EqualValues(t, tt.otKind, otSpanKind.GetStringValue().Value) - } else { - assert.True(t, ocSpan.Attributes == nil) - } - }) - } -} - -func TestV2ParsesTags(t *testing.T) { - jsonBlob, err := ioutil.ReadFile("./testdata/zipkin_v2_single.json") - require.NoError(t, err, "Failed to read sample JSON file: %v", err) - - var zs []*zipkinmodel.SpanModel - require.NoError(t, json.Unmarshal(jsonBlob, &zs), "Failed to unmarshal zipkin spans") - - reqs, err := V2BatchToOCProto(zs) - require.NoError(t, err, "Failed to convert Zipkin spans to Trace spans: %v", err) - require.Len(t, reqs, 1, "Expecting only one request", len(reqs)) - require.Len(t, reqs, 1, "Expecting only one span", len(reqs[0].Spans)) - - var expected = &tracepb.Span_Attributes{ - AttributeMap: map[string]*tracepb.AttributeValue{ - "http.path": {Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{Value: "/api"}, - }}, - "http.status_code": {Value: &tracepb.AttributeValue_IntValue{IntValue: 500}}, - "cache_hit": {Value: &tracepb.AttributeValue_BoolValue{BoolValue: true}}, - "ping_count": {Value: &tracepb.AttributeValue_IntValue{IntValue: 25}}, - "timeout": {Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{Value: "12.3"}, - }}, - "ipv6": {Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{Value: "7::80:807f"}}, - }, - "clnt/finagle.version": {Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{Value: "6.45.0"}}, - }, - "zipkin.remoteEndpoint.ipv4": {Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{Value: "192.168.99.101"}}, - }, - "zipkin.remoteEndpoint.port": {Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{Value: "9000"}}, - }, - "zipkin.remoteEndpoint.serviceName": {Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{Value: "backend"}}, - }, - }, - } - - var span = reqs[0].Spans[0] - assert.EqualValues(t, expected, span.Attributes) - - var expectedStatus = &tracepb.Status{ - Code: tracetranslator.OCInternal, - } - assert.EqualValues(t, expectedStatus, span.Status) -} - -func TestZipkinTagsToTraceAttributesDropTag(t *testing.T) { - tags := map[string]string{ - "status.code": "13", - "status.message": "a message", - "http.path": "/api", - } - - attrs, status := zipkinTagsToTraceAttributes(tags, zipkinmodel.Client) - - var expected = &tracepb.Span_Attributes{ - AttributeMap: map[string]*tracepb.AttributeValue{ - "http.path": {Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{Value: "/api"}, - }}, - }, - } - assert.EqualValues(t, expected, attrs) - assert.EqualValues(t, status, &tracepb.Status{Code: 13, Message: "a message"}) -} - -func TestNodeFromZipkinEndpointsSetsAttributeOnNode(t *testing.T) { - zc := zipkinmodel.SpanContext{ - TraceID: zipkinmodel.TraceID{ - High: 0x0001020304050607, - Low: 0x08090A0B0C0D0E0F}, - ID: zipkinmodel.ID(uint64(0xF1F2F3F4F5F6F7F8)), - } - zs := &zipkinmodel.SpanModel{ - SpanContext: zc, - LocalEndpoint: &zipkinmodel.Endpoint{ - ServiceName: "my-service", - IPv4: net.IPv4(1, 2, 3, 4), - }, - } - - pbs := &tracepb.Span{} - _ = nodeFromZipkinEndpoints(zs, pbs) - - var expected = &tracepb.Span_Attributes{ - AttributeMap: map[string]*tracepb.AttributeValue{ - "ipv4": {Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{Value: "1.2.3.4"}, - }}, - }, - } - - assert.EqualValues(t, expected, pbs.Attributes) -} - -func TestV2ParsesTagsHandleNil(t *testing.T) { - zs := []*zipkinmodel.SpanModel{ - nil, - } - - reqs, err := V2BatchToOCProto(zs) - assert.Nil(t, err) - assert.EqualValues(t, 0, len(reqs)) -} diff --git a/translator/trace/zipkin/zipkinv2_to_traces.go b/translator/trace/zipkin/zipkinv2_to_traces.go new file mode 100644 index 00000000000..23b8988877d --- /dev/null +++ b/translator/trace/zipkin/zipkinv2_to_traces.go @@ -0,0 +1,463 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "math" + "regexp" + "sort" + "strconv" + "strings" + + zipkinmodel "github.com/openzipkin/zipkin-go/model" + + "go.opentelemetry.io/collector/consumer/pdata" + otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/trace/v1" + "go.opentelemetry.io/collector/translator/conventions" + tracetranslator "go.opentelemetry.io/collector/translator/trace" +) + +var nonSpanAttributes = getNonSpanAttributes() + +func getNonSpanAttributes() map[string]struct{} { + attrs := make(map[string]struct{}) + for _, key := range conventions.GetResourceSemanticConventionAttributeNames() { + attrs[key] = struct{}{} + } + attrs[tracetranslator.TagServiceNameSource] = struct{}{} + attrs[tracetranslator.TagInstrumentationName] = struct{}{} + attrs[tracetranslator.TagInstrumentationVersion] = struct{}{} + attrs[conventions.OCAttributeProcessStartTime] = struct{}{} + attrs[conventions.OCAttributeExporterVersion] = struct{}{} + attrs[conventions.OCAttributeProcessID] = struct{}{} + attrs[conventions.OCAttributeResourceType] = struct{}{} + return attrs +} + +type AttrValDescript struct { + regex *regexp.Regexp + attrType pdata.AttributeValueType +} + +var attrValDescriptions = getAttrValDescripts() + +func getAttrValDescripts() []*AttrValDescript { + descriptions := make([]*AttrValDescript, 0, 5) + descriptions = append(descriptions, constructAttrValDescript("^$", pdata.AttributeValueNULL)) + descriptions = append(descriptions, constructAttrValDescript(`^-?\d+$`, pdata.AttributeValueINT)) + descriptions = append(descriptions, constructAttrValDescript(`^-?\d+\.\d+$`, pdata.AttributeValueDOUBLE)) + descriptions = append(descriptions, constructAttrValDescript(`^(true|false)$`, pdata.AttributeValueBOOL)) + return descriptions +} + +func constructAttrValDescript(regex string, attrType pdata.AttributeValueType) *AttrValDescript { + regexc, _ := regexp.Compile(regex) + return &AttrValDescript{ + regex: regexc, + attrType: attrType, + } +} + +// Custom Sort on +type byOTLPTypes []*zipkinmodel.SpanModel + +func (b byOTLPTypes) Len() int { + return len(b) +} + +func (b byOTLPTypes) Less(i, j int) bool { + diff := strings.Compare(extractLocalServiceName(b[i]), extractLocalServiceName(b[j])) + if diff != 0 { + return diff <= 0 + } + diff = strings.Compare(extractInstrumentationLibrary(b[i]), extractInstrumentationLibrary(b[j])) + return diff <= 0 +} + +func (b byOTLPTypes) Swap(i, j int) { + b[i], b[j] = b[j], b[i] +} + +// V2SpansToInternalTraces translates Zipkin v2 spans into internal trace data. +func V2SpansToInternalTraces(zipkinSpans []*zipkinmodel.SpanModel) (pdata.Traces, error) { + traceData := pdata.NewTraces() + if len(zipkinSpans) == 0 { + return traceData, nil + } + + sort.Sort(byOTLPTypes(zipkinSpans)) + + rss := traceData.ResourceSpans() + prevServiceName := "" + prevInstrLibName := "" + rsCount := rss.Len() + ilsCount := 0 + spanCount := 0 + var curRscSpans pdata.ResourceSpans + var curILSpans pdata.InstrumentationLibrarySpans + var curSpans pdata.SpanSlice + for _, zspan := range zipkinSpans { + if zspan == nil { + continue + } + tags := copySpanTags(zspan.Tags) + localServiceName := extractLocalServiceName(zspan) + if localServiceName != prevServiceName { + prevServiceName = localServiceName + rss.Resize(rsCount + 1) + curRscSpans = rss.At(rsCount) + curRscSpans.InitEmpty() + rsCount++ + populateResourceFromZipkinSpan(tags, localServiceName, curRscSpans.Resource()) + prevInstrLibName = "" + ilsCount = 0 + } + instrLibName := extractInstrumentationLibrary(zspan) + if instrLibName != prevInstrLibName || ilsCount == 0 { + prevInstrLibName = instrLibName + curRscSpans.InstrumentationLibrarySpans().Resize(ilsCount + 1) + curILSpans = curRscSpans.InstrumentationLibrarySpans().At(ilsCount) + curILSpans.InitEmpty() + ilsCount++ + populateILFromZipkinSpan(tags, instrLibName, curILSpans.InstrumentationLibrary()) + spanCount = 0 + curSpans = curILSpans.Spans() + } + curSpans.Resize(spanCount + 1) + err := zSpanToInternal(zspan, tags, curSpans.At(spanCount)) + if err != nil { + return traceData, err + } + spanCount++ + } + + return traceData, nil +} + +func zSpanToInternal(zspan *zipkinmodel.SpanModel, tags map[string]string, dest pdata.Span) error { + dest.InitEmpty() + + dest.SetTraceID(pdata.TraceID(tracetranslator.UInt64ToByteTraceID(zspan.TraceID.High, zspan.TraceID.Low))) + dest.SetSpanID(pdata.SpanID(tracetranslator.UInt64ToByteSpanID(uint64(zspan.ID)))) + if value, ok := tags[tracetranslator.TagW3CTraceState]; ok { + dest.SetTraceState(pdata.TraceState(value)) + delete(tags, tracetranslator.TagW3CTraceState) + } + parentID := zspan.ParentID + if parentID != nil && *parentID != zspan.ID { + dest.SetParentSpanID(pdata.SpanID(tracetranslator.UInt64ToByteSpanID(uint64(*parentID)))) + } + + dest.SetName(zspan.Name) + startNano := zspan.Timestamp.UnixNano() + dest.SetStartTime(pdata.TimestampUnixNano(startNano)) + if zspan.Duration.Nanoseconds() > 0 { + dest.SetEndTime(pdata.TimestampUnixNano(startNano + zspan.Duration.Nanoseconds())) + } + dest.SetKind(zipkinKindToSpanKind(zspan.Kind, tags)) + + populateSpanStatus(tags, dest.Status()) + if err := zTagsToSpanLinks(tags, dest.Links()); err != nil { + return err + } + + attrs := dest.Attributes() + attrs.InitEmptyWithCapacity(len(tags)) + if err := zTagsToInternalAttrs(zspan, tags, attrs); err != nil { + return err + } + + err := populateSpanEvents(zspan, dest.Events()) + return err +} + +func populateSpanStatus(tags map[string]string, status pdata.SpanStatus) { + if value, ok := tags[tracetranslator.TagStatusCode]; ok { + status.InitEmpty() + status.SetCode(pdata.StatusCode(otlptrace.Status_StatusCode_value[value])) + delete(tags, tracetranslator.TagStatusCode) + if value, ok := tags[tracetranslator.TagStatusMsg]; ok { + status.SetMessage(value) + delete(tags, tracetranslator.TagStatusMsg) + } + } +} + +func zipkinKindToSpanKind(kind zipkinmodel.Kind, tags map[string]string) pdata.SpanKind { + switch kind { + case zipkinmodel.Client: + return pdata.SpanKindCLIENT + case zipkinmodel.Server: + return pdata.SpanKindSERVER + case zipkinmodel.Producer: + return pdata.SpanKindPRODUCER + case zipkinmodel.Consumer: + return pdata.SpanKindCONSUMER + default: + if value, ok := tags[tracetranslator.TagSpanKind]; ok { + delete(tags, tracetranslator.TagSpanKind) + if value == "internal" { + return pdata.SpanKindINTERNAL + } + } + return pdata.SpanKindUNSPECIFIED + } +} + +func zTagsToSpanLinks(tags map[string]string, dest pdata.SpanLinkSlice) error { + index := 0 + for i := 0; i < 128; i++ { + key := fmt.Sprintf("otlp.link.%d", i) + val, ok := tags[key] + if !ok { + return nil + } + delete(tags, key) + + parts := strings.Split(val, "|") + partCnt := len(parts) + if partCnt < 5 { + continue + } + dest.Resize(index + 1) + link := dest.At(index) + index++ + link.InitEmpty() + rawTrace, errTrace := hex.DecodeString(parts[0]) + if errTrace != nil { + return errTrace + } + link.SetTraceID(pdata.NewTraceID(rawTrace)) + rawSpan, errSpan := hex.DecodeString(parts[1]) + if errSpan != nil { + return errSpan + } + link.SetSpanID(pdata.NewSpanID(rawSpan)) + link.SetTraceState(pdata.TraceState(parts[2])) + + var jsonStr string + if partCnt == 5 { + jsonStr = parts[3] + } else { + jsonParts := parts[3 : partCnt-1] + jsonStr = strings.Join(jsonParts, "|") + } + var attrs map[string]interface{} + if err := json.Unmarshal([]byte(jsonStr), &attrs); err != nil { + return err + } + if err := jsonMapToAttributeMap(attrs, link.Attributes()); err != nil { + return err + } + + dropped, errDropped := strconv.ParseUint(parts[partCnt-1], 10, 32) + if errDropped != nil { + return errDropped + } + link.SetDroppedAttributesCount(uint32(dropped)) + } + return nil +} + +func populateSpanEvents(zspan *zipkinmodel.SpanModel, events pdata.SpanEventSlice) error { + events.Resize(len(zspan.Annotations)) + for ix, anno := range zspan.Annotations { + event := events.At(ix) + startNano := anno.Timestamp.UnixNano() + event.SetTimestamp(pdata.TimestampUnixNano(startNano)) + + parts := strings.Split(anno.Value, "|") + partCnt := len(parts) + event.SetName(parts[0]) + if partCnt < 3 { + continue + } + + var jsonStr string + if partCnt == 3 { + jsonStr = parts[1] + } else { + jsonParts := parts[1 : partCnt-1] + jsonStr = strings.Join(jsonParts, "|") + } + var attrs map[string]interface{} + if err := json.Unmarshal([]byte(jsonStr), &attrs); err != nil { + return err + } + if err := jsonMapToAttributeMap(attrs, event.Attributes()); err != nil { + return err + } + + dropped, errDropped := strconv.ParseUint(parts[partCnt-1], 10, 32) + if errDropped != nil { + return errDropped + } + event.SetDroppedAttributesCount(uint32(dropped)) + } + return nil +} + +func jsonMapToAttributeMap(attrs map[string]interface{}, dest pdata.AttributeMap) error { + for key, val := range attrs { + if s, ok := val.(string); ok { + dest.InsertString(key, s) + } else if d, ok := val.(float64); ok { + if math.Mod(d, 1.0) == 0.0 { + dest.InsertInt(key, int64(d)) + } else { + dest.InsertDouble(key, d) + } + } else if b, ok := val.(bool); ok { + dest.InsertBool(key, b) + } + } + return nil +} + +func zTagsToInternalAttrs(zspan *zipkinmodel.SpanModel, tags map[string]string, dest pdata.AttributeMap) error { + parseErr := tagsToAttributeMap(tags, dest) + if zspan.LocalEndpoint != nil { + if zspan.LocalEndpoint.IPv4 != nil { + dest.InsertString(conventions.AttributeNetHostIP, zspan.LocalEndpoint.IPv4.String()) + } + if zspan.LocalEndpoint.IPv6 != nil { + dest.InsertString(conventions.AttributeNetHostIP, zspan.LocalEndpoint.IPv6.String()) + } + if zspan.LocalEndpoint.Port > 0 { + dest.UpsertInt(conventions.AttributeNetHostPort, int64(zspan.LocalEndpoint.Port)) + } + } + if zspan.RemoteEndpoint != nil { + if zspan.RemoteEndpoint.ServiceName != "" { + dest.InsertString(conventions.AttributePeerService, zspan.RemoteEndpoint.ServiceName) + } + if zspan.RemoteEndpoint.IPv4 != nil { + dest.InsertString(conventions.AttributeNetPeerIP, zspan.RemoteEndpoint.IPv4.String()) + } + if zspan.RemoteEndpoint.IPv6 != nil { + dest.InsertString(conventions.AttributeNetPeerIP, zspan.RemoteEndpoint.IPv6.String()) + } + if zspan.RemoteEndpoint.Port > 0 { + dest.UpsertInt(conventions.AttributeNetPeerPort, int64(zspan.RemoteEndpoint.Port)) + } + } + return parseErr +} + +func tagsToAttributeMap(tags map[string]string, dest pdata.AttributeMap) error { + var parseErr error + for key, val := range tags { + if _, ok := nonSpanAttributes[key]; ok { + continue + } + switch determineValueType(val) { + case pdata.AttributeValueINT: + iVal, _ := strconv.ParseInt(val, 10, 64) + dest.UpsertInt(key, iVal) + case pdata.AttributeValueDOUBLE: + fVal, _ := strconv.ParseFloat(val, 64) + dest.UpsertDouble(key, fVal) + case pdata.AttributeValueBOOL: + bVal, _ := strconv.ParseBool(val) + dest.UpsertBool(key, bVal) + default: + dest.UpsertString(key, val) + } + } + return parseErr +} + +func determineValueType(value string) pdata.AttributeValueType { + for _, desc := range attrValDescriptions { + if desc.regex.MatchString(value) { + return desc.attrType + } + } + return pdata.AttributeValueSTRING +} + +func populateResourceFromZipkinSpan(tags map[string]string, localServiceName string, resource pdata.Resource) { + if tracetranslator.ResourceNotSet == localServiceName { + return + } + + resource.InitEmpty() + if tracetranslator.ResourceNoAttrs == localServiceName { + return + } + + if len(tags) == 0 { + resource.Attributes().InsertString(conventions.AttributeServiceName, localServiceName) + return + } + + snSource := tags[tracetranslator.TagServiceNameSource] + if snSource == "" { + resource.Attributes().InsertString(conventions.AttributeServiceName, localServiceName) + } else { + resource.Attributes().InsertString(snSource, localServiceName) + } + delete(tags, tracetranslator.TagServiceNameSource) + + for key := range getNonSpanAttributes() { + if key == tracetranslator.TagInstrumentationName || key == tracetranslator.TagInstrumentationVersion { + continue + } + if value, ok := tags[key]; ok { + resource.Attributes().UpsertString(key, value) + delete(tags, key) + } + } +} + +func populateILFromZipkinSpan(tags map[string]string, instrLibName string, library pdata.InstrumentationLibrary) { + if instrLibName == "" { + return + } + library.InitEmpty() + if value, ok := tags[tracetranslator.TagInstrumentationName]; ok { + library.SetName(value) + delete(tags, tracetranslator.TagInstrumentationName) + } + if value, ok := tags[tracetranslator.TagInstrumentationVersion]; ok { + library.SetVersion(value) + delete(tags, tracetranslator.TagInstrumentationVersion) + } +} + +func copySpanTags(tags map[string]string) map[string]string { + dest := make(map[string]string, len(tags)) + for key, val := range tags { + dest[key] = val + } + return dest +} + +func extractLocalServiceName(zspan *zipkinmodel.SpanModel) string { + if zspan == nil || zspan.LocalEndpoint == nil { + return tracetranslator.ResourceNotSet + } + return zspan.LocalEndpoint.ServiceName +} + +func extractInstrumentationLibrary(zspan *zipkinmodel.SpanModel) string { + if zspan == nil || len(zspan.Tags) == 0 { + return "" + } + return zspan.Tags[tracetranslator.TagInstrumentationName] +} diff --git a/translator/trace/zipkin/zipkinv2_to_traces_test.go b/translator/trace/zipkin/zipkinv2_to_traces_test.go new file mode 100644 index 00000000000..9d9e943bf4a --- /dev/null +++ b/translator/trace/zipkin/zipkinv2_to_traces_test.go @@ -0,0 +1,133 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin + +import ( + "testing" + "time" + + zipkinmodel "github.com/openzipkin/zipkin-go/model" + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal/data/testdata" + "go.opentelemetry.io/collector/translator/conventions" +) + +func TestZipkinSpansToInternalTraces(t *testing.T) { + tests := []struct { + name string + zs []*zipkinmodel.SpanModel + td pdata.Traces + err error + }{ + { + name: "empty", + zs: make([]*zipkinmodel.SpanModel, 0), + td: testdata.GenerateTraceDataEmpty(), + err: nil, + }, + { + name: "nilSpan", + zs: generateNilSpan(), + td: testdata.GenerateTraceDataEmpty(), + err: nil, + }, + { + name: "minimalSpan", + zs: generateSpanNoEndpoints(), + td: generateTraceSingleSpanNoResourceOrInstrLibrary(), + err: nil, + }, + { + name: "onlyLocalEndpointSpan", + zs: generateSpanNoTags(), + td: generateTraceSingleSpanMinmalResource(), + err: nil, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + td, err := V2SpansToInternalTraces(test.zs) + assert.EqualValues(t, test.err, err) + if test.name != "nilSpan" { + assert.Equal(t, len(test.zs), td.SpanCount()) + } + assert.EqualValues(t, test.td, td) + }) + } +} + +func generateNilSpan() []*zipkinmodel.SpanModel { + return make([]*zipkinmodel.SpanModel, 1) +} + +func generateSpanNoEndpoints() []*zipkinmodel.SpanModel { + spans := make([]*zipkinmodel.SpanModel, 1) + spans[0] = &zipkinmodel.SpanModel{ + SpanContext: zipkinmodel.SpanContext{ + TraceID: convertTraceID( + []byte{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80}), + ID: convertSpanID([]byte{0xAF, 0xAE, 0xAD, 0xAC, 0xAB, 0xAA, 0xA9, 0xA8}), + }, + Name: "MinimalData", + Kind: zipkinmodel.Client, + Timestamp: time.Unix(1596911098, 294000000), + Duration: 1000000, + Shared: false, + LocalEndpoint: nil, + RemoteEndpoint: nil, + Annotations: nil, + Tags: nil, + } + return spans +} + +func generateSpanNoTags() []*zipkinmodel.SpanModel { + spans := generateSpanNoEndpoints() + spans[0].LocalEndpoint = &zipkinmodel.Endpoint{ServiceName: "SoleAttr"} + return spans +} + +func generateTraceSingleSpanNoResourceOrInstrLibrary() pdata.Traces { + td := pdata.NewTraces() + td.ResourceSpans().Resize(1) + rs := td.ResourceSpans().At(0) + rs.InitEmpty() + rs.InstrumentationLibrarySpans().Resize(1) + ils := rs.InstrumentationLibrarySpans().At(0) + ils.InitEmpty() + ils.Spans().Resize(1) + span := ils.Spans().At(0) + span.SetTraceID( + []byte{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80}) + span.SetSpanID([]byte{0xAF, 0xAE, 0xAD, 0xAC, 0xAB, 0xAA, 0xA9, 0xA8}) + span.SetName("MinimalData") + span.SetKind(pdata.SpanKindCLIENT) + span.SetStartTime(1596911098294000000) + span.SetEndTime(1596911098295000000) + span.Attributes().InitEmptyWithCapacity(0) + return td +} + +func generateTraceSingleSpanMinmalResource() pdata.Traces { + td := generateTraceSingleSpanNoResourceOrInstrLibrary() + rs := td.ResourceSpans().At(0) + rsc := rs.Resource() + rsc.InitEmpty() + rsc.Attributes().InitEmptyWithCapacity(1) + rsc.Attributes().UpsertString(conventions.AttributeServiceName, "SoleAttr") + return td +}