Skip to content

Commit

Permalink
contrib/cloud.google.com/go/pubsub.v1: Add WithServiceName option
Browse files Browse the repository at this point in the history
  • Loading branch information
Mickey Reiss authored and Mickey Reiss committed Nov 24, 2020
1 parent b9e8e8c commit d6223c8
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 5 deletions.
31 changes: 26 additions & 5 deletions contrib/cloud.google.com/go/pubsub.v1/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"sync"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
Expand Down Expand Up @@ -63,15 +64,31 @@ func (r *PublishResult) Get(ctx context.Context) (string, error) {
return serverID, err
}

type config struct {
serviceName string
}

// A ReceiveOption is used to customize spans started by WrapReceiveHandler.
type ReceiveOption func(cfg *config)

// WithServiceName sets the service name tag for traces started by WrapReceiveHandler.
func WithServiceName(serviceName string) ReceiveOption {
return func(cfg *config) {
cfg.serviceName = serviceName
}
}

// WrapReceiveHandler returns a receive handler that wraps the supplied handler,
// extracts any tracing metadata attached to the received message, and starts a
// receive span.
func WrapReceiveHandler(s *pubsub.Subscription, f func(context.Context, *pubsub.Message)) func(context.Context, *pubsub.Message) {
func WrapReceiveHandler(s *pubsub.Subscription, f func(context.Context, *pubsub.Message), opts ...ReceiveOption) func(context.Context, *pubsub.Message) {
var cfg config
for _, opt := range opts {
opt(&cfg)
}
return func(ctx context.Context, msg *pubsub.Message) {
parentSpanCtx, _ := tracer.Extract(tracer.TextMapCarrier(msg.Attributes))
span, ctx := tracer.StartSpanFromContext(
ctx,
"pubsub.receive",
opts := []ddtrace.StartSpanOption{
tracer.ResourceName(s.String()),
tracer.SpanType(ext.SpanTypeMessageConsumer),
tracer.Tag("message_size", len(msg.Data)),
Expand All @@ -80,7 +97,11 @@ func WrapReceiveHandler(s *pubsub.Subscription, f func(context.Context, *pubsub.
tracer.Tag("message_id", msg.ID),
tracer.Tag("publish_time", msg.PublishTime.String()),
tracer.ChildOf(parentSpanCtx),
)
}
if cfg.serviceName != "" {
opts = append(opts, tracer.ServiceName(cfg.serviceName))
}
span, ctx := tracer.StartSpanFromContext(ctx, "pubsub.receive", opts...)
if msg.DeliveryAttempt != nil {
span.SetTag("delivery_attempt", *msg.DeliveryAttempt)
}
Expand Down
22 changes: 22 additions & 0 deletions contrib/cloud.google.com/go/pubsub.v1/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,28 @@ func TestPropagation(t *testing.T) {
}, spans[2].Tags())
}

func TestPropagationWithServiceName(t *testing.T) {
assert := assert.New(t)
ctx, topic, sub, mt, cleanup := setup(t)
defer cleanup()

// Publisher
span, pctx := tracer.StartSpanFromContext(ctx, "service-name-test")
_, err := Publish(pctx, topic, &pubsub.Message{Data: []byte("hello")}).Get(pctx)
assert.NoError(err)
span.Finish()

// Subscriber
err = sub.Receive(ctx, WrapReceiveHandler(sub, func(ctx context.Context, msg *pubsub.Message) {
msg.Ack()
}, WithServiceName("example.service")))
assert.NoError(err)

spans := mt.FinishedSpans()
assert.Len(spans, 3, "wrong number of spans")
assert.Equal("example.service", spans[2].Tag(ext.ServiceName))
}

func TestPropagationNoParentSpan(t *testing.T) {
assert := assert.New(t)
ctx, topic, sub, mt, cleanup := setup(t)
Expand Down

0 comments on commit d6223c8

Please sign in to comment.