Skip to content

Commit

Permalink
Merge pull request #782 from tensorchen/remove_dup_key
Browse files Browse the repository at this point in the history
using api/standard keys instead of custom keys
  • Loading branch information
MrAlias committed Jun 1, 2020
2 parents 86a1026 + 1f9a66f commit 823703a
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 82 deletions.
42 changes: 17 additions & 25 deletions instrumentation/grpctrace/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"net"
"regexp"

"go.opentelemetry.io/otel/api/standard"

"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
Expand All @@ -33,39 +35,29 @@ import (
"go.opentelemetry.io/otel/api/trace"
)

var (
rpcServiceKey = kv.Key("rpc.service")
netPeerIPKey = kv.Key("net.peer.ip")
netPeerPortKey = kv.Key("net.peer.port")

messageTypeKey = kv.Key("message.type")
messageIDKey = kv.Key("message.id")
messageUncompressedSizeKey = kv.Key("message.uncompressed_size")
)

type messageType string
type messageType kv.KeyValue

// Event adds an event of the messageType to the span associated with the
// passed context with id and size (if message is a proto message).
func (m messageType) Event(ctx context.Context, id int, message interface{}) {
span := trace.SpanFromContext(ctx)
if p, ok := message.(proto.Message); ok {
span.AddEvent(ctx, "message",
messageTypeKey.String(string(m)),
messageIDKey.Int(id),
messageUncompressedSizeKey.Int(proto.Size(p)),
kv.KeyValue(m),
standard.RPCMessageIDKey.Int(id),
standard.RPCMessageUncompressedSizeKey.Int(proto.Size(p)),
)
} else {
span.AddEvent(ctx, "message",
messageTypeKey.String(string(m)),
messageIDKey.Int(id),
kv.KeyValue(m),
standard.RPCMessageIDKey.Int(id),
)
}
}

const (
messageSent messageType = "SENT"
messageReceived messageType = "RECEIVED"
var (
messageSent = messageType(standard.RPCMessageTypeSent)
messageReceived = messageType(standard.RPCMessageTypeReceived)
)

// UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable
Expand Down Expand Up @@ -93,7 +85,7 @@ func UnaryClientInterceptor(tracer trace.Tracer) grpc.UnaryClientInterceptor {
ctx, method,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(peerInfoFromTarget(cc.Target())...),
trace.WithAttributes(rpcServiceKey.String(serviceFromFullMethod(method))),
trace.WithAttributes(standard.RPCServiceKey.String(serviceFromFullMethod(method))),
)
defer span.End()

Expand Down Expand Up @@ -271,7 +263,7 @@ func StreamClientInterceptor(tracer trace.Tracer) grpc.StreamClientInterceptor {
ctx, method,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(peerInfoFromTarget(cc.Target())...),
trace.WithAttributes(rpcServiceKey.String(serviceFromFullMethod(method))),
trace.WithAttributes(standard.RPCServiceKey.String(serviceFromFullMethod(method))),
)

Inject(ctx, &metadataCopy)
Expand Down Expand Up @@ -325,7 +317,7 @@ func UnaryServerInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
info.FullMethod,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(peerInfoFromContext(ctx)...),
trace.WithAttributes(rpcServiceKey.String(serviceFromFullMethod(info.FullMethod))),
trace.WithAttributes(standard.RPCServiceKey.String(serviceFromFullMethod(info.FullMethod))),
)
defer span.End()

Expand Down Expand Up @@ -415,7 +407,7 @@ func StreamServerInterceptor(tracer trace.Tracer) grpc.StreamServerInterceptor {
info.FullMethod,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(peerInfoFromContext(ctx)...),
trace.WithAttributes(rpcServiceKey.String(serviceFromFullMethod(info.FullMethod))),
trace.WithAttributes(standard.RPCServiceKey.String(serviceFromFullMethod(info.FullMethod))),
)
defer span.End()

Expand All @@ -442,8 +434,8 @@ func peerInfoFromTarget(target string) []kv.KeyValue {
}

return []kv.KeyValue{
netPeerIPKey.String(host),
netPeerPortKey.String(port),
standard.NetPeerIPKey.String(host),
standard.NetPeerPortKey.String(port),
}
}

Expand Down
102 changes: 52 additions & 50 deletions instrumentation/grpctrace/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"testing"
"time"

"go.opentelemetry.io/otel/api/standard"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -93,100 +95,100 @@ func TestUnaryClientInterceptor(t *testing.T) {
{
name: "/github.51.al.serviceName/bar",
expectedAttr: map[kv.Key]value.Value{
rpcServiceKey: value.String("serviceName"),
netPeerIPKey: value.String("fake"),
netPeerPortKey: value.String("connection"),
standard.RPCServiceKey: value.String("serviceName"),
standard.NetPeerIPKey: value.String("fake"),
standard.NetPeerPortKey: value.String("connection"),
},
eventsAttr: []map[kv.Key]value.Value{
{
messageTypeKey: value.String("SENT"),
messageIDKey: value.Int(1),
messageUncompressedSizeKey: value.Int(proto.Size(proto.Message(req))),
standard.RPCMessageTypeKey: value.String("SENT"),
standard.RPCMessageIDKey: value.Int(1),
standard.RPCMessageUncompressedSizeKey: value.Int(proto.Size(proto.Message(req))),
},
{
messageTypeKey: value.String("RECEIVED"),
messageIDKey: value.Int(1),
messageUncompressedSizeKey: value.Int(proto.Size(proto.Message(reply))),
standard.RPCMessageTypeKey: value.String("RECEIVED"),
standard.RPCMessageIDKey: value.Int(1),
standard.RPCMessageUncompressedSizeKey: value.Int(proto.Size(proto.Message(reply))),
},
},
},
{
name: "/serviceName/bar",
expectedAttr: map[kv.Key]value.Value{
rpcServiceKey: value.String("serviceName"),
netPeerIPKey: value.String("fake"),
netPeerPortKey: value.String("connection"),
standard.RPCServiceKey: value.String("serviceName"),
standard.NetPeerIPKey: value.String("fake"),
standard.NetPeerPortKey: value.String("connection"),
},
eventsAttr: []map[kv.Key]value.Value{
{
messageTypeKey: value.String("SENT"),
messageIDKey: value.Int(1),
messageUncompressedSizeKey: value.Int(proto.Size(proto.Message(req))),
standard.RPCMessageTypeKey: value.String("SENT"),
standard.RPCMessageIDKey: value.Int(1),
standard.RPCMessageUncompressedSizeKey: value.Int(proto.Size(proto.Message(req))),
},
{
messageTypeKey: value.String("RECEIVED"),
messageIDKey: value.Int(1),
messageUncompressedSizeKey: value.Int(proto.Size(proto.Message(reply))),
standard.RPCMessageTypeKey: value.String("RECEIVED"),
standard.RPCMessageIDKey: value.Int(1),
standard.RPCMessageUncompressedSizeKey: value.Int(proto.Size(proto.Message(reply))),
},
},
},
{
name: "serviceName/bar",
expectedAttr: map[kv.Key]value.Value{
rpcServiceKey: value.String("serviceName"),
netPeerIPKey: value.String("fake"),
netPeerPortKey: value.String("connection"),
standard.RPCServiceKey: value.String("serviceName"),
standard.NetPeerIPKey: value.String("fake"),
standard.NetPeerPortKey: value.String("connection"),
},
eventsAttr: []map[kv.Key]value.Value{
{
messageTypeKey: value.String("SENT"),
messageIDKey: value.Int(1),
messageUncompressedSizeKey: value.Int(proto.Size(proto.Message(req))),
standard.RPCMessageTypeKey: value.String("SENT"),
standard.RPCMessageIDKey: value.Int(1),
standard.RPCMessageUncompressedSizeKey: value.Int(proto.Size(proto.Message(req))),
},
{
messageTypeKey: value.String("RECEIVED"),
messageIDKey: value.Int(1),
messageUncompressedSizeKey: value.Int(proto.Size(proto.Message(reply))),
standard.RPCMessageTypeKey: value.String("RECEIVED"),
standard.RPCMessageIDKey: value.Int(1),
standard.RPCMessageUncompressedSizeKey: value.Int(proto.Size(proto.Message(reply))),
},
},
},
{
name: "invalidName",
expectedAttr: map[kv.Key]value.Value{
rpcServiceKey: value.String(""),
netPeerIPKey: value.String("fake"),
netPeerPortKey: value.String("connection"),
standard.RPCServiceKey: value.String(""),
standard.NetPeerIPKey: value.String("fake"),
standard.NetPeerPortKey: value.String("connection"),
},
eventsAttr: []map[kv.Key]value.Value{
{
messageTypeKey: value.String("SENT"),
messageIDKey: value.Int(1),
messageUncompressedSizeKey: value.Int(proto.Size(proto.Message(req))),
standard.RPCMessageTypeKey: value.String("SENT"),
standard.RPCMessageIDKey: value.Int(1),
standard.RPCMessageUncompressedSizeKey: value.Int(proto.Size(proto.Message(req))),
},
{
messageTypeKey: value.String("RECEIVED"),
messageIDKey: value.Int(1),
messageUncompressedSizeKey: value.Int(proto.Size(proto.Message(reply))),
standard.RPCMessageTypeKey: value.String("RECEIVED"),
standard.RPCMessageIDKey: value.Int(1),
standard.RPCMessageUncompressedSizeKey: value.Int(proto.Size(proto.Message(reply))),
},
},
},
{
name: "/github.51.al.foo.serviceName_123/method",
expectedAttr: map[kv.Key]value.Value{
rpcServiceKey: value.String("serviceName_123"),
netPeerIPKey: value.String("fake"),
netPeerPortKey: value.String("connection"),
standard.RPCServiceKey: value.String("serviceName_123"),
standard.NetPeerIPKey: value.String("fake"),
standard.NetPeerPortKey: value.String("connection"),
},
eventsAttr: []map[kv.Key]value.Value{
{
messageTypeKey: value.String("SENT"),
messageIDKey: value.Int(1),
messageUncompressedSizeKey: value.Int(proto.Size(proto.Message(req))),
standard.RPCMessageTypeKey: value.String("SENT"),
standard.RPCMessageIDKey: value.Int(1),
standard.RPCMessageUncompressedSizeKey: value.Int(proto.Size(proto.Message(req))),
},
{
messageTypeKey: value.String("RECEIVED"),
messageIDKey: value.Int(1),
messageUncompressedSizeKey: value.Int(proto.Size(proto.Message(reply))),
standard.RPCMessageTypeKey: value.String("RECEIVED"),
standard.RPCMessageIDKey: value.Int(1),
standard.RPCMessageUncompressedSizeKey: value.Int(proto.Size(proto.Message(reply))),
},
},
},
Expand Down Expand Up @@ -342,9 +344,9 @@ func TestStreamClientInterceptor(t *testing.T) {

attrs := spanData.Attributes
expectedAttr := map[kv.Key]string{
rpcServiceKey: "serviceName",
netPeerIPKey: "fake",
netPeerPortKey: "connection",
standard.RPCServiceKey: "serviceName",
standard.NetPeerIPKey: "fake",
standard.NetPeerPortKey: "connection",
}

for _, attr := range attrs {
Expand All @@ -365,10 +367,10 @@ func TestStreamClientInterceptor(t *testing.T) {
msgID := i/2 + 1
validate := func(eventName string, attrs []kv.KeyValue) {
for _, attr := range attrs {
if attr.Key == messageTypeKey && attr.Value.AsString() != eventName {
if attr.Key == standard.RPCMessageTypeKey && attr.Value.AsString() != eventName {
t.Errorf("invalid event on index: %d expecting %s event, receive %s event", i, eventName, attr.Value.AsString())
}
if attr.Key == messageIDKey && attr.Value != value.Int(msgID) {
if attr.Key == standard.RPCMessageIDKey && attr.Value != value.Int(msgID) {
t.Errorf("invalid id for message event expected %d received %d", msgID, attr.Value.AsInt32())
}
}
Expand Down
6 changes: 4 additions & 2 deletions instrumentation/httptrace/clienttrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"strings"
"sync"

"go.opentelemetry.io/otel/api/standard"

"google.golang.org/grpc/codes"

"go.opentelemetry.io/otel/api/global"
Expand Down Expand Up @@ -150,7 +152,7 @@ func (ct *clientTracer) span(hook string) trace.Span {
}

func (ct *clientTracer) getConn(host string) {
ct.start("http.getconn", "http.getconn", HostKey.String(host))
ct.start("http.getconn", "http.getconn", standard.HTTPHostKey.String(host))
}

func (ct *clientTracer) gotConn(info httptrace.GotConnInfo) {
Expand All @@ -170,7 +172,7 @@ func (ct *clientTracer) gotFirstResponseByte() {
}

func (ct *clientTracer) dnsStart(info httptrace.DNSStartInfo) {
ct.start("http.dns", "http.dns", HostKey.String(info.Host))
ct.start("http.dns", "http.dns", standard.HTTPHostKey.String(info.Host))
}

func (ct *clientTracer) dnsDone(info httptrace.DNSDoneInfo) {
Expand Down
5 changes: 0 additions & 5 deletions instrumentation/httptrace/httptrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ import (
"go.opentelemetry.io/otel/api/trace"
)

var (
HostKey = kv.Key("http.host")
URLKey = kv.Key("http.url")
)

// Returns the Attributes, Context Entries, and SpanContext that were encoded by Inject.
func Extract(ctx context.Context, req *http.Request) ([]kv.KeyValue, []kv.KeyValue, trace.SpanContext) {
ctx = propagation.ExtractHTTP(ctx, global.Propagators(), req.Header)
Expand Down

0 comments on commit 823703a

Please sign in to comment.