Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add open telemetry trace support #5034

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pubsub/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ require (
github.com/google/go-cmp v0.5.7
github.com/googleapis/gax-go/v2 v2.1.1
go.opencensus.io v0.23.0
go.opentelemetry.io/otel v1.4.1
go.opentelemetry.io/otel/sdk v1.4.1
go.opentelemetry.io/otel/trace v1.4.1
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11
Expand Down
18 changes: 18 additions & 0 deletions pubsub/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand All @@ -87,6 +88,10 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-logr/logr v1.2.2 h1:ahHml/yUpnlb96Rp8HCvtYVPY8ZYpxq3g7UYchIYwbs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -169,9 +174,12 @@ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand All @@ -181,6 +189,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand All @@ -195,6 +204,12 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v1.4.1 h1:QbINgGDDcoQUoMJa2mMaWno49lja9sHwp6aoa2n3a4g=
go.opentelemetry.io/otel v1.4.1/go.mod h1:StM6F/0fSwpd8dKWDCdRr7uRvEPYdW0hBSlbdTiUde4=
go.opentelemetry.io/otel/sdk v1.4.1 h1:J7EaW71E0v87qflB4cDolaqq3AcujGrtyIPGQoZOB0Y=
go.opentelemetry.io/otel/sdk v1.4.1/go.mod h1:NBwHDgDIBYjwK2WNu1OPgsIc2IJzmBXNnvIJxJc8BpE=
go.opentelemetry.io/otel/trace v1.4.1 h1:O+16qcdTrT7zxv2J6GejTPFinSwA++cYerC5iSiF8EQ=
go.opentelemetry.io/otel/trace v1.4.1/go.mod h1:iYEVbroFCNut9QkwEczV9vMRPHNKSSwYZjulEtsmhFc=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down Expand Up @@ -339,6 +354,7 @@ golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down Expand Up @@ -582,10 +598,12 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
63 changes: 56 additions & 7 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ import (
"cloud.google.com/go/internal/optional"
"cloud.google.com/go/pubsub/internal/scheduler"
gax "github.com/googleapis/gax-go/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
fmpb "google.golang.org/genproto/protobuf/field_mask"
Expand All @@ -51,6 +56,10 @@ type Subscription struct {
receiveActive bool

enableOrdering bool

// topicName is for creating spans for OpenTelemetry tracing.
topicName string
tracer trace.Tracer
}

// Subscription creates a reference to a subscription.
Expand All @@ -60,9 +69,15 @@ func (c *Client) Subscription(id string) *Subscription {

// SubscriptionInProject creates a reference to a subscription in a given project.
func (c *Client) SubscriptionInProject(id, projectID string) *Subscription {
return newSubscription(c, fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id))
}

func newSubscription(c *Client, name string) *Subscription {
return &Subscription{
c: c,
name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id),
c: c,
name: name,
ReceiveSettings: DefaultReceiveSettings,
tracer: otel.Tracer(defaultTracerName),
}
}

Expand Down Expand Up @@ -112,7 +127,7 @@ func (subs *SubscriptionIterator) Next() (*Subscription, error) {
if err != nil {
return nil, err
}
return &Subscription{c: subs.c, name: subName}, nil
return newSubscription(subs.c, subName), nil
}

// NextConfig returns the next subscription config. If there are no more subscriptions,
Expand Down Expand Up @@ -840,7 +855,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
s.mu.Unlock()
defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()

s.checkOrdering(ctx)
s.checkSubConfig()

maxCount := s.ReceiveSettings.MaxOutstandingMessages
if maxCount == 0 {
Expand Down Expand Up @@ -926,6 +941,9 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
defer wg.Wait()
defer cancel2()
for {
opts := getSubSpanAttributes(s.topicName, &Message{}, semconv.MessagingOperationReceive)
ctx2, rs := s.tracer.Start(ctx2, fmt.Sprintf("%s receive", s.topicName), opts...)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this span can block for a long time if there is no message to be processed in the topic.
It can potentially cause the consumer span to be started before the message is published:

Process P:     | topic send |
--
Process C: |---sub receive---|---sub process---|

where both sub receive and sub process are children of topic send.

This is what it looks like in honeycomb (with no parent trace in the message)
image

PS: It also causes the span based metrics to be messed up.


var maxToPull int32 // maximum number of messages to pull
if po.synchronous {
if po.maxPrefetch < 0 {
Expand Down Expand Up @@ -955,13 +973,17 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
return nil
default:
}

msgs, err := iter.receive(maxToPull)
if err == io.EOF {
return nil
}
if err != nil {
rs.RecordError(err)
rs.SetStatus(otelcodes.Error, err.Error())
return err
}
rs.End()
// If context is done and messages have been pulled,
// nack them.
select {
Expand All @@ -972,8 +994,20 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
return nil
default:
}

for i, msg := range msgs {
msg := msg
opts := getSubSpanAttributes(s.topicName, msg, semconv.MessagingOperationProcess)
if msg.Attributes != nil && msg.Attributes["gogclient_traceparent"] != "" {
lctx := otel.GetTextMapPropagator().Extract(ctx2, NewPubsubMessageCarrier(msg))
link := trace.LinkFromContext(lctx)
opts = append(opts, trace.WithLinks(link))
}
_, ps := s.tracer.Start(ctx2, fmt.Sprintf("%s process", s.topicName), opts...)
ps.AddEvent("waiting for subscriber flow control")
var fcTimer time.Time
if ps.IsRecording() {
fcTimer = time.Now()
}
// TODO(jba): call acquire closer to when the message is allocated.
if err := fc.acquire(ctx, len(msg.Data)); err != nil {
// TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done.
Expand All @@ -983,11 +1017,18 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
// Return nil if the context is done, not err.
return nil
}
ps.AddEvent("acquired subscriber flow control resources", trace.WithAttributes(attribute.Float64("elapsed_ms", float64(time.Since(fcTimer))/float64(time.Millisecond))))
ackh, _ := msgAckHandler(msg)
old := ackh.doneFunc
msgLen := len(msg.Data)
ackh.doneFunc = func(ackID string, ack bool, receiveTime time.Time) {
defer fc.release(ctx, msgLen)
defer ps.End()
if ack {
ps.SetAttributes(attribute.String("result", "ack"))
} else {
ps.SetAttributes(attribute.String("result", "nack"))
}
old(ackID, ack, receiveTime)
}
wg.Add(1)
Expand All @@ -1000,9 +1041,13 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
// constructor level?
if err := sched.Add(key, msg, func(msg interface{}) {
defer wg.Done()
rs.AddEvent("started handling provided callback")
f(ctx2, msg.(*Message))
rs.AddEvent("finished handling provided callback")
}); err != nil {
wg.Done()
ps.RecordError(err)
ps.SetStatus(otelcodes.Error, err.Error())
return err
}
}
Expand All @@ -1028,17 +1073,21 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
return group.Wait()
}

// checkOrdering calls Config to check theEnableMessageOrdering field.
// checkSubConfig calls Config to check the subscription config fields.
// For ordering, we check the EnableMessageOrdering field.
// For OpenTelemetry, we check the topic name.
// If this call fails (e.g. because the service account doesn't have
// the roles/viewer or roles/pubsub.viewer role) we will assume
// EnableMessageOrdering to be true.
// See: https://github.com/googleapis/google-cloud-go/issues/3884
func (s *Subscription) checkOrdering(ctx context.Context) {
func (s *Subscription) checkSubConfig() {
ctx := context.Background()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did this context switch to background?

cfg, err := s.Config(ctx)
if err != nil {
s.enableOrdering = true
} else {
s.enableOrdering = cfg.EnableMessageOrdering
s.topicName = cfg.Topic.name
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does not work when the gcp identity don't have roles/pubsub.viewer, it should fallback to the subscription name in the err != nil case, otherwise the span name will be " receive"

}
}

Expand Down
Loading