Skip to content
This repository has been archived by the owner on Jul 1, 2022. It is now read-only.

Commit

Permalink
Add Zipkin V2 reporter adapter (#399)
Browse files Browse the repository at this point in the history
* Add ZipkinReporter to support Zipkin v2

This adds a new class called Zipkin2Reporter that adapts a Zipkin v2
reporter and converts Jaeger spans to Zipkin v2 spans.  This enables us
to send spans to a Zipkin v2 server using the newer protocol.

This also moves the existing Thrift conversion logic to a new package
`io.jaegertracing.zipkin` where it exists alongside the new v2
conversion logic.

Signed-off-by: Ben Keith <bkeith@signalfx.com>

* Reverting ThriftSpanConverter to origin location

Also removing Zipkin v1 special tag handling from V2SpanConverter
because they are not necessary

Signed-off-by: Ben Keith <bkeith@signalfx.com>

* Misc changes from code review

Signed-off-by: Ben Keith <bkeith@signalfx.com>
  • Loading branch information
keitwb authored and pavolloffay committed May 18, 2018
1 parent 8e237f8 commit 07df0b8
Show file tree
Hide file tree
Showing 10 changed files with 626 additions and 27 deletions.
31 changes: 29 additions & 2 deletions jaeger-zipkin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,39 @@ tracer = new Tracer.Builder(serviceName, reporter, sampler)
```

## Sending data to Zipkin
Zipkin supports transports including Http and Kafka. You can configure Jaeger to send to a Zipkin server with
`ZipkinSender`.
There are two ways to send spans to a Zipkin server:

### Thrift
If you want to send Zipkin v1 Thrift-encoded spans, you should use the `ZipkinSender` sender, which
wraps a Zipkin sender class to enable the use of various transports such as HTTP and Kafka.

For example:
```java
import io.jaegertracing.senders.zipkin.ZipkinSender;

reporter = new RemoteReporter(ZipkinSender.create("http://localhost:9411/api/v1/spans"));
tracer = new Tracer.Builder(serviceName, reporter, sampler)
...
```

### Zipkin 2 Reporters
You can reuse a Zipkin 2 reporter instance as-is by using `ZipkinV2Reporter`, which adapts a Zipkin
2 reporter to the Jaeger reporter interface and deals with converting Jaeger spans to the Zipkin 2
model.

For example:
```java
import io.jaegertracing.zipkin.reporters.ZipkinV2Reporter;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.urlconnection.URLConnectionSender;

reporter = new ZipkinV2Reporter(
AsyncReporter.create(URLConnectionSender.create("http://localhost:9411/api/v2/spans")));

tracer = new Tracer.Builder(serviceName)
.withReporter(reporter)
...
.build()
```

This will send spans to the Zipkin v2 endpoint using the v2 JSON encoding.
7 changes: 6 additions & 1 deletion jaeger-zipkin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ description = 'Integration library for Zipkin'
dependencies {
compile group: 'io.zipkin.reporter', name: 'zipkin-reporter', version: '1.1.2'
compile group: 'io.zipkin.reporter', name: 'zipkin-sender-urlconnection', version: '1.1.2'

compile group: 'io.zipkin.java', name: 'zipkin', version: '2.8.1'
compile group: 'io.zipkin.reporter2', name: 'zipkin-reporter', version: '2.6.0'

compile project(path: ':jaeger-core', configuration: 'shadow')

testCompile group: 'io.zipkin.reporter2', name: 'zipkin-sender-urlconnection', version: '2.6.0'
testCompile group: 'junit', name: 'junit', version: junitVersion
testCompile group: 'io.zipkin.java', name: 'zipkin-junit', version: '2.3.0'
testCompile group: 'io.zipkin.java', name: 'zipkin-junit', version: '2.7.1'
testCompile group: 'com.tngtech.java', name: 'junit-dataprovider', version: junitDataProviderVersion

signature 'org.codehaus.mojo.signature:java16:1.1@signature'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.jaegertracing.Span;
import io.jaegertracing.SpanContext;
import io.jaegertracing.Tracer;
import io.jaegertracing.zipkin.ConverterUtil;
import io.opentracing.tag.Tags;
import java.nio.charset.Charset;
import java.util.ArrayList;
Expand Down Expand Up @@ -56,10 +57,10 @@ public static com.twitter.zipkin.thriftjava.Span convertSpan(Span span) {
private static List<Annotation> buildAnnotations(Span span, Endpoint host) {
List<Annotation> annotations = new ArrayList<Annotation>();

if (isRpc(span)) {
if (ConverterUtil.isRpc(span)) {
String startLabel = zipkincoreConstants.SERVER_RECV;
String endLabel = zipkincoreConstants.SERVER_SEND;
if (isRpcClient(span)) {
if (ConverterUtil.isRpcClient(span)) {
startLabel = zipkincoreConstants.CLIENT_SEND;
endLabel = zipkincoreConstants.CLIENT_RECV;
}
Expand Down Expand Up @@ -87,9 +88,9 @@ private static List<Annotation> buildAnnotations(Span span, Endpoint host) {
private static List<BinaryAnnotation> buildBinaryAnnotations(Span span, Endpoint host) {
List<BinaryAnnotation> binaryAnnotations = new ArrayList<BinaryAnnotation>();
Map<String, Object> tags = span.getTags();
boolean isRpc = isRpc(span);
boolean isClient = isRpcClient(span);
boolean firstSpanInProcess = span.getReferences().isEmpty() || isRpcServer(span);
boolean isRpc = ConverterUtil.isRpc(span);
boolean isClient = ConverterUtil.isRpcClient(span);
boolean firstSpanInProcess = span.getReferences().isEmpty() || ConverterUtil.isRpcServer(span);

if (firstSpanInProcess) {
Map<String, ?> processTags = span.getTracer().tags();
Expand Down Expand Up @@ -154,20 +155,6 @@ private static BinaryAnnotation buildBinaryAnnotation(String tagKey, Object tagV
return banno;
}

static boolean isRpcServer(Span span) {
return Tags.SPAN_KIND_SERVER.equals(span.getTags().get(Tags.SPAN_KIND.getKey()));
}

static boolean isRpc(Span span) {
Object spanKindValue = span.getTags().get(Tags.SPAN_KIND.getKey());
return Tags.SPAN_KIND_CLIENT.equals(spanKindValue) || Tags.SPAN_KIND_SERVER.equals(spanKindValue);

}

static boolean isRpcClient(Span span) {
return Tags.SPAN_KIND_CLIENT.equals(span.getTags().get(Tags.SPAN_KIND.getKey()));
}

/**
* Extract peer Endpoint from tags
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@

/**
* This sends (TBinaryProtocol big-endian) encoded spans to a Zipkin Collector (usually a
* zipkin-server).
* zipkin-server). If you want to send newer Zipkin V2 spans in protocols other than Thrift,
* see {@link io.jaegertracing.zipkin.reporters.ZipkinV2Reporter ZipkinV2Reporter}.
*
* <p>
* Example usage:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2018, The Jaeger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package io.jaegertracing.zipkin;

import io.jaegertracing.Span;
import io.opentracing.tag.Tags;

/**
* Logic that is common to both Thrift v1 and JSON v2 senders
*/
public class ConverterUtil {
public static boolean isRpcServer(Span span) {
return Tags.SPAN_KIND_SERVER.equals(span.getTags().get(Tags.SPAN_KIND.getKey()));
}

public static boolean isRpc(Span span) {
return isRpcServer(span) || isRpcClient(span);
}

public static boolean isRpcClient(Span span) {
return Tags.SPAN_KIND_CLIENT.equals(span.getTags().get(Tags.SPAN_KIND.getKey()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright (c) 2018, The Jaeger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package io.jaegertracing.zipkin;

import com.google.gson.Gson;
import io.jaegertracing.Constants;
import io.jaegertracing.LogData;
import io.jaegertracing.Span;
import io.jaegertracing.SpanContext;
import io.jaegertracing.Tracer;
import io.opentracing.tag.Tags;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;

/**
* Converts a Jaeger span to a Zipkin2 span.
*/
@Slf4j
public class V2SpanConverter {

private static final Gson gson = new Gson();

public static zipkin2.Span convertSpan(Span span) {
Tracer tracer = span.getTracer();
zipkin2.Endpoint host = zipkin2.Endpoint.newBuilder()
.ip(convertIp(tracer.getIpv4()))
.serviceName(tracer.getServiceName())
.build();

zipkin2.Endpoint peerEndpoint = extractPeerEndpoint(span.getTags());

SpanContext context = span.context();
zipkin2.Span.Builder builder = zipkin2.Span.newBuilder()
.id(Long.toHexString(context.getSpanId()))
.traceId(Long.toHexString(context.getTraceId()))
.name(span.getOperationName())
.parentId(Long.toHexString(context.getParentId()))
.debug(context.isDebug())
.localEndpoint(host)
.remoteEndpoint(peerEndpoint)
.kind(convertKind(span.getTags().get(Tags.SPAN_KIND.getKey())))
.timestamp(span.getStart())
.duration(span.getDuration());

buildAnnotations(span, builder);
buildTags(span, builder);

return builder.build();
}

private static zipkin2.Span.Kind convertKind(Object kind) {
if (Tags.SPAN_KIND_SERVER.equals(kind)) {
return zipkin2.Span.Kind.SERVER;
} else if (Tags.SPAN_KIND_CLIENT.equals(kind)) {
return zipkin2.Span.Kind.CLIENT;
} else if (Tags.SPAN_KIND_CONSUMER.equals(kind)) {
return zipkin2.Span.Kind.CONSUMER;
} else if (Tags.SPAN_KIND_PRODUCER.equals(kind)) {
return zipkin2.Span.Kind.PRODUCER;
} else {
return null;
}
}

private static void buildAnnotations(Span span, zipkin2.Span.Builder builder) {
List<LogData> logs = span.getLogs();
if (logs != null) {
for (LogData logData : logs) {
String logMessage = logData.getMessage();
Map<String, ?> logFields = logData.getFields();
if (logMessage != null) {
builder.addAnnotation(logData.getTime(), logMessage);
} else if (logFields != null) {
builder.addAnnotation(logData.getTime(), gson.toJson(logFields));
}
}
}
}

private static void buildTags(Span span, zipkin2.Span.Builder builder) {
Map<String, Object> tags = span.getTags();
boolean firstSpanInProcess = span.getReferences().isEmpty() || ConverterUtil.isRpcServer(span);

if (firstSpanInProcess) {
Map<String, ?> processTags = span.getTracer().tags();
// add tracer tags to first zipkin span in a process but remove "ip" tag as it is
// taken care of separately.
for (Map.Entry<String, ?> entry : processTags.entrySet()) {
String tagKey = entry.getKey();
if (!Constants.TRACER_IP_TAG_KEY.equals(tagKey)) {
Object tagValue = entry.getValue();
// add a tracer. prefix to process tags for zipkin
builder.putTag("tracer." + tagKey, tagValue.toString());
}
}
}

if (tags != null) {
for (Map.Entry<String, Object> entry : tags.entrySet()) {
String tagKey = entry.getKey();
// Every value is converted to string because zipkin search doesn't
// work well with ints, and bytes.
Object tagValue = entry.getValue();
builder.putTag(tagKey, tagValue.toString());
}
}
}

private static InetAddress convertIp(int ip) {
byte[] bytes = ByteBuffer.allocate(4).putInt(ip).array();
try {
return InetAddress.getByAddress(bytes);
} catch (UnknownHostException e) {
log.error("Jaeger span IP " + ip + " could not be converted", e);
return null;
}
}

/**
* Extract peer Endpoint from tags
*
* @param tags tags
* @return null or peer endpoint
*/
public static zipkin2.Endpoint extractPeerEndpoint(Map<String, Object> tags) {
Object peerIpv4 = tags.get(Tags.PEER_HOST_IPV4.getKey());
Object peerPort = tags.get(Tags.PEER_PORT.getKey());
Object peerService = tags.get(Tags.PEER_SERVICE.getKey());

if (peerIpv4 == null && peerPort == null && peerService == null) {
return null;
}

zipkin2.Endpoint.Builder builder = zipkin2.Endpoint.newBuilder();

if (peerIpv4 instanceof String) {
builder.ip((String) peerIpv4);
}
if (peerPort instanceof Number) {
builder.port(((Number) peerPort).intValue());
}
if (peerService instanceof String) {
builder.serviceName((String) peerService);
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2018, The Jaeger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package io.jaegertracing.zipkin.reporters;

import io.jaegertracing.reporters.Reporter;
import io.jaegertracing.zipkin.V2SpanConverter;

/**
* Wrapper around a zipkin v2 AsyncReporter that reports spans using the newer v2 Span class
*/
public class ZipkinV2Reporter implements Reporter {
public final zipkin2.reporter.AsyncReporter<zipkin2.Span> reporter;

public ZipkinV2Reporter(zipkin2.reporter.AsyncReporter<zipkin2.Span> reporter) {
this.reporter = reporter;
}

@Override
public void report(io.jaegertracing.Span span) {
reporter.report(V2SpanConverter.convertSpan(span));
}

@Override
public void close() {
reporter.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.jaegertracing.Tracer;
import io.jaegertracing.reporters.InMemoryReporter;
import io.jaegertracing.samplers.ConstSampler;
import io.jaegertracing.zipkin.ConverterUtil;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMap;
import io.opentracing.propagation.TextMapExtractAdapter;
Expand Down Expand Up @@ -241,17 +242,17 @@ public void testSpanDetectsIsClient() {
Span span = (Span) tracer.buildSpan("test-service-operation").start();
Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_CLIENT);

assertTrue(ThriftSpanConverter.isRpc(span));
assertTrue(ThriftSpanConverter.isRpcClient(span));
assertTrue(ConverterUtil.isRpc(span));
assertTrue(ConverterUtil.isRpcClient(span));
}

@Test
public void testSpanDetectsIsServer() {
Span span = (Span) tracer.buildSpan("test-service-operation").start();
Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_SERVER);

assertTrue(ThriftSpanConverter.isRpc(span));
assertFalse(ThriftSpanConverter.isRpcClient(span));
assertTrue(ConverterUtil.isRpc(span));
assertFalse(ConverterUtil.isRpcClient(span));
}

@Test
Expand Down
Loading

0 comments on commit 07df0b8

Please sign in to comment.