From 3e3eb4ce160b0088b102c0cfc217ac6457ef0e50 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Mon, 10 May 2021 13:25:01 -0700 Subject: [PATCH 01/17] add basic publish span --- pubsub/go.mod | 6 ++++++ pubsub/go.sum | 17 +++++++++++++++++ pubsub/topic.go | 14 +++++++++++++- 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/pubsub/go.mod b/pubsub/go.mod index b4d54ef6a4dd..59371a0b8072 100644 --- a/pubsub/go.mod +++ b/pubsub/go.mod @@ -8,6 +8,12 @@ require ( github.com/google/go-cmp v0.5.5 github.com/googleapis/gax-go/v2 v2.0.5 go.opencensus.io v0.23.0 + go.opentelemetry.io/otel v0.19.0 // indirect + go.opentelemetry.io/otel/exporters/stdout v0.19.0 // indirect + go.opentelemetry.io/otel/metric v0.19.0 // indirect + go.opentelemetry.io/otel/sdk v0.19.0 // indirect + go.opentelemetry.io/otel/sdk/metric v0.19.0 // indirect + go.opentelemetry.io/otel/trace v0.19.0 // indirect golang.org/x/oauth2 v0.0.0-20210413134643-5e61552d6c78 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba diff --git a/pubsub/go.sum b/pubsub/go.sum index 71d667c5bb76..a96643a64b7e 100644 --- a/pubsub/go.sum +++ b/pubsub/go.sum @@ -39,6 +39,7 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= @@ -140,6 +141,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -152,6 +154,21 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opentelemetry.io/otel v0.19.0 h1:Lenfy7QHRXPZVsw/12CWpxX6d/JkrX8wrx2vO8G80Ng= +go.opentelemetry.io/otel v0.19.0/go.mod h1:j9bF567N9EfomkSidSfmMwIwIBuP37AMAIzVW85OxSg= +go.opentelemetry.io/otel/exporters/stdout v0.19.0 h1:6+QJvepCJ/YS3rOlsnjhVo527ohlPowOBgsZThR9Hoc= +go.opentelemetry.io/otel/exporters/stdout v0.19.0/go.mod h1:UI2JnNRaSt9ChIHkk4+uqieH27qKt9isV9e2qRorCtg= +go.opentelemetry.io/otel/metric v0.19.0 h1:dtZ1Ju44gkJkYvo+3qGqVXmf88tc+a42edOywypengg= +go.opentelemetry.io/otel/metric v0.19.0/go.mod h1:8f9fglJPRnXuskQmKpnad31lcLJ2VmNNqIsx/uIwBSc= +go.opentelemetry.io/otel/oteltest v0.19.0/go.mod h1:tI4yxwh8U21v7JD6R3BcA/2+RBoTKFexE/PJ/nSO7IA= +go.opentelemetry.io/otel/sdk v0.19.0 h1:13pQquZyGbIvGxBWcVzUqe8kg5VGbTBiKKKXpYCylRM= +go.opentelemetry.io/otel/sdk v0.19.0/go.mod h1:ouO7auJYMivDjywCHA6bqTI7jJMVQV1HdKR5CmH8DGo= +go.opentelemetry.io/otel/sdk/export/metric v0.19.0 h1:9A1PC2graOx3epRLRWbq4DPCdpMUYK8XeCrdAg6ycbI= +go.opentelemetry.io/otel/sdk/export/metric v0.19.0/go.mod h1:exXalzlU6quLTXiv29J+Qpj/toOzL3H5WvpbbjouTBo= +go.opentelemetry.io/otel/sdk/metric v0.19.0 h1:fka1Zc/lpRMS+KlTP/TRXZuaFtSjUg/maHV3U8rt1Mc= +go.opentelemetry.io/otel/sdk/metric v0.19.0/go.mod h1:t12+Mqmj64q1vMpxHlCGXGggo0sadYxEG6U+Us/9OA4= +go.opentelemetry.io/otel/trace v0.19.0 h1:1ucYlenXIDA1OlHVLDZKX0ObXV5RLaq06DtUKz5e5zc= +go.opentelemetry.io/otel/trace v0.19.0/go.mod h1:4IXiNextNOpPnRlI4ryK69mn5iC84bjBWZQA5DXz/qg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/pubsub/topic.go b/pubsub/topic.go index 806a23509696..08f8026ec3da 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -30,6 +30,8 @@ import ( gax "github.com/googleapis/gax-go/v2" "go.opencensus.io/stats" "go.opencensus.io/tag" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "google.golang.org/api/support/bundler" pb "google.golang.org/genproto/googleapis/pubsub/v1" fmpb "google.golang.org/genproto/protobuf/field_mask" @@ -70,6 +72,8 @@ type Topic struct { // EnableMessageOrdering enables delivery of ordered keys. EnableMessageOrdering bool + + tracer trace.Tracer } // PublishSettings control the bundling of published messages. @@ -180,6 +184,7 @@ func newTopic(c *Client, name string) *Topic { c: c, name: name, PublishSettings: DefaultPublishSettings, + tracer: otel.Tracer("instrumentation/package/name"), } } @@ -421,6 +426,9 @@ type PublishResult = ipubsub.PublishResult // need to be stopped by calling t.Stop(). Once stopped, future calls to Publish // will immediately return a PublishResult with an error. func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { + var span trace.Span + _, span = t.tracer.Start(ctx, "publish") + defer span.End() r := ipubsub.NewPublishResult() if !t.EnableMessageOrdering && msg.OrderingKey != "" { ipubsub.SetPublishResult(r, "", errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering")) @@ -452,7 +460,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { // TODO(jba) [from bcmills] consider using a shared channel per bundle // (requires Bundler API changes; would reduce allocations) - err := t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r}, msgSize) + err := t.scheduler.Add(msg.OrderingKey, &bundledMessage{ctx, msg, r}, msgSize) if err != nil { t.scheduler.Pause(msg.OrderingKey) ipubsub.SetPublishResult(r, "", err) @@ -475,6 +483,7 @@ func (t *Topic) Stop() { } type bundledMessage struct { + ctx context.Context msg *Message res *PublishResult } @@ -544,6 +553,9 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) OrderingKey: bm.msg.OrderingKey, } bm.msg = nil // release bm.msg for GC + var span trace.Span + ctx, span = t.tracer.Start(bm.ctx, "publish message bundle") + defer span.End() } var res *pb.PublishResponse start := time.Now() From 4465dcee52e9660852a444529a3ad7351e64b394 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 5 Oct 2021 18:13:23 -0700 Subject: [PATCH 02/17] update publish span --- pubsub/go.mod | 2 ++ pubsub/go.sum | 12 ++++++++++++ pubsub/topic.go | 19 ++++++++++++++++++- 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/pubsub/go.mod b/pubsub/go.mod index c1ef0d195d60..c107b35b49b6 100644 --- a/pubsub/go.mod +++ b/pubsub/go.mod @@ -9,6 +9,8 @@ require ( github.com/google/go-cmp v0.5.6 github.com/googleapis/gax-go/v2 v2.1.0 go.opencensus.io v0.23.0 + go.opentelemetry.io/otel v1.0.0 + go.opentelemetry.io/otel/trace v1.0.0 golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac diff --git a/pubsub/go.sum b/pubsub/go.sum index 88c9414bc95a..1529f25c5497 100644 --- a/pubsub/go.sum +++ b/pubsub/go.sum @@ -60,6 +60,7 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -153,9 +154,12 @@ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1: github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= @@ -165,6 +169,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -178,6 +184,10 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opentelemetry.io/otel v1.0.0 h1:qTTn6x71GVBvoafHK/yaRUmFzI4LcONZD0/kXxl5PHI= +go.opentelemetry.io/otel v1.0.0/go.mod h1:AjRVh9A5/5DE7S+mZtTR6t8vpKKryam+0lREnfmS4cg= +go.opentelemetry.io/otel/trace v1.0.0 h1:TSBr8GTEtKevYMG/2d21M989r5WJYVimhTHBKVEZuh4= +go.opentelemetry.io/otel/trace v1.0.0/go.mod h1:PXTWqayeFUlJV1YDNhsJYB184+IvAH814St6o6ajzIs= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -535,10 +545,12 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pubsub/topic.go b/pubsub/topic.go index aa4bacfc8ed1..5c895067b985 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -32,6 +32,9 @@ import ( gax "github.com/googleapis/gax-go/v2" "go.opencensus.io/stats" "go.opencensus.io/tag" + "go.opentelemetry.io/otel" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace" "google.golang.org/api/support/bundler" pb "google.golang.org/genproto/googleapis/pubsub/v1" fmpb "google.golang.org/genproto/protobuf/field_mask" @@ -75,6 +78,8 @@ type Topic struct { // EnableMessageOrdering enables delivery of ordered keys. EnableMessageOrdering bool + + tracer trace.Tracer } // PublishSettings control the bundling of published messages. @@ -192,6 +197,7 @@ func newTopic(c *Client, name string) *Topic { c: c, name: name, PublishSettings: DefaultPublishSettings, + tracer: otel.Tracer("instreumentation/package/name"), } } @@ -498,6 +504,11 @@ type PublishResult = ipubsub.PublishResult // need to be stopped by calling t.Stop(). Once stopped, future calls to Publish // will immediately return a PublishResult with an error. func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { + var span trace.Span + ctx, span = t.tracer.Start(ctx, t.String()+" send") + span.SetAttributes(semconv.MessagingSystemKey.String("pubsub"), semconv.MessagingDestinationKey.String(t.String()), + semconv.MessagingDestinationKindTopic) + defer span.End() r := ipubsub.NewPublishResult() if !t.EnableMessageOrdering && msg.OrderingKey != "" { ipubsub.SetPublishResult(r, "", errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering")) @@ -511,6 +522,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { Attributes: msg.Attributes, OrderingKey: msg.OrderingKey, }) + span.SetAttributes(semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize)) t.initBundler() t.mu.RLock() @@ -526,7 +538,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { ipubsub.SetPublishResult(r, "", err) return r } - err := t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize) + err := t.scheduler.Add(msg.OrderingKey, &bundledMessage{ctx, msg, r, msgSize}, msgSize) if err != nil { t.scheduler.Pause(msg.OrderingKey) ipubsub.SetPublishResult(r, "", err) @@ -557,6 +569,7 @@ func (t *Topic) Flush() { } type bundledMessage struct { + ctx context.Context msg *Message res *PublishResult size int @@ -645,6 +658,10 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) OrderingKey: bm.msg.OrderingKey, } bm.msg = nil // release bm.msg for GC + _, span := t.tracer.Start(bm.ctx, "msg") + span.SetAttributes(semconv.MessagingSystemKey.String("pubsub"), semconv.MessagingDestinationKey.String(t.String()), + semconv.MessagingDestinationKindTopic) + defer span.End() } var res *pb.PublishResponse start := time.Now() From f82fea87d4c37cc0df1dcab853e9d2a81649bac6 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Wed, 20 Oct 2021 17:27:31 -0700 Subject: [PATCH 03/17] add publisher/subscriber span attribute helper --- pubsub/subscription.go | 22 ++++++++++-- pubsub/topic.go | 23 +++++++++---- pubsub/trace.go | 78 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 114 insertions(+), 9 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 67413d384988..aa3e7fdce9cb 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -27,6 +27,8 @@ import ( "cloud.google.com/go/internal/optional" "cloud.google.com/go/pubsub/internal/scheduler" gax "github.com/googleapis/gax-go/v2" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" pb "google.golang.org/genproto/googleapis/pubsub/v1" fmpb "google.golang.org/genproto/protobuf/field_mask" @@ -51,6 +53,8 @@ type Subscription struct { receiveActive bool enableOrdering bool + + tracer trace.Tracer } // Subscription creates a reference to a subscription. @@ -60,9 +64,15 @@ func (c *Client) Subscription(id string) *Subscription { // SubscriptionInProject creates a reference to a subscription in a given project. func (c *Client) SubscriptionInProject(id, projectID string) *Subscription { + return newSubscription(c, fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id)) +} + +func newSubscription(c *Client, name string) *Subscription { return &Subscription{ - c: c, - name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id), + c: c, + name: name, + ReceiveSettings: DefaultReceiveSettings, + tracer: otel.Tracer("instrumentation/pubsub/sub"), } } @@ -112,7 +122,7 @@ func (subs *SubscriptionIterator) Next() (*Subscription, error) { if err != nil { return nil, err } - return &Subscription{c: subs.c, name: subName}, nil + return newSubscription(subs.c, subName), nil } // NextConfig returns the next subscription config. If there are no more subscriptions, @@ -949,6 +959,10 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes default: } for i, msg := range msgs { + ctx2 = otel.GetTextMapPropagator().Extract(ctx2, NewPubsubMessageCarrier(msg)) + ctx2, receiveSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s receive", s.String())) + defer receiveSpan.End() + msg := msg // TODO(jba): call acquire closer to when the message is allocated. if err := fc.acquire(ctx, len(msg.Data)); err != nil { @@ -975,8 +989,10 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // TODO(deklerk): Can we have a generic handler at the // constructor level? if err := sched.Add(key, msg, func(msg interface{}) { + ctx2, consumeSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s process", s.String())) defer wg.Done() f(ctx2, msg.(*Message)) + consumeSpan.End() }); err != nil { wg.Done() return err diff --git a/pubsub/topic.go b/pubsub/topic.go index 5c895067b985..c720f4111a5d 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -505,9 +505,8 @@ type PublishResult = ipubsub.PublishResult // will immediately return a PublishResult with an error. func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { var span trace.Span - ctx, span = t.tracer.Start(ctx, t.String()+" send") - span.SetAttributes(semconv.MessagingSystemKey.String("pubsub"), semconv.MessagingDestinationKey.String(t.String()), - semconv.MessagingDestinationKindTopic) + opts := getPublisherAttributes(t.String(), msg.OrderingKey) + ctx, span = t.tracer.Start(ctx, t.String()+" send", opts...) defer span.End() r := ipubsub.NewPublishResult() if !t.EnableMessageOrdering && msg.OrderingKey != "" { @@ -515,6 +514,13 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { return r } + if span.SpanContext().IsValid() { + if msg.Attributes == nil { + msg.Attributes = make(map[string]string) + } + otel.GetTextMapPropagator().Inject(ctx, NewPubsubMessageCarrier(msg)) + } + // Calculate the size of the encoded proto message by accounting // for the length of an individual PubSubMessage and Data/Attributes field. msgSize := proto.Size(&pb.PubsubMessage{ @@ -538,8 +544,13 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { ipubsub.SetPublishResult(r, "", err) return r } - err := t.scheduler.Add(msg.OrderingKey, &bundledMessage{ctx, msg, r, msgSize}, msgSize) - if err != nil { + bmsg := &bundledMessage{ + ctx: ctx, + msg: msg, + res: r, + size: msgSize, + } + if err := t.scheduler.Add(msg.OrderingKey, bmsg, msgSize); err != nil { t.scheduler.Pause(msg.OrderingKey) ipubsub.SetPublishResult(r, "", err) } @@ -658,7 +669,7 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) OrderingKey: bm.msg.OrderingKey, } bm.msg = nil // release bm.msg for GC - _, span := t.tracer.Start(bm.ctx, "msg") + _, span := t.tracer.Start(bm.ctx, "publish message bundle") span.SetAttributes(semconv.MessagingSystemKey.String("pubsub"), semconv.MessagingDestinationKey.String(t.String()), semconv.MessagingDestinationKindTopic) defer span.End() diff --git a/pubsub/trace.go b/pubsub/trace.go index 84cab3cd3cf1..b608dc6790af 100644 --- a/pubsub/trace.go +++ b/pubsub/trace.go @@ -22,6 +22,10 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace" ) // The following keys are used to tag requests with a specific topic/subscription ID. @@ -236,3 +240,77 @@ func withSubscriptionKey(ctx context.Context, subName string) context.Context { func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) { stats.Record(ctx, m.M(n)) } + +// func parseSpanContext(sc string) *trace.SpanContextConfig { +// ctx := context.Background() +// // TODO: figure out how to unmarshal json spancontext -> scc +// if err := json.Unmarshal([]byte(sc), spanContext); err != nil { +// panic(err) +// } +// fmt.Printf("got span context: %+v\n", spanContext) + +// return &trace.SpanContextConfig{ +// TraceID: trace.TraceID{}, +// SpanID: spanContext.SpanID(), +// TraceFlags: spanContext.TraceFlags(), +// TraceState: spanContext.TraceState(), +// Remote: spanContext.IsRemote(), +// } +// } + +var _ propagation.TextMapCarrier = (*PubsubMessageCarrier)(nil) + +// PubsubMessageCarrier injects and extracts traces from a pubsub.Message. +type PubsubMessageCarrier struct { + msg *Message +} + +// NewPubsubMessageCarrier creates a new PubsubMessageCarrier.PubsubMessageCarrier. +func NewPubsubMessageCarrier(msg *Message) PubsubMessageCarrier { + return PubsubMessageCarrier{msg: msg} +} + +// Get retrieves a single value for a given key. +func (c PubsubMessageCarrier) Get(key string) string { + return c.msg.Attributes[key] +} + +// Set sets a header. +func (c PubsubMessageCarrier) Set(key, val string) { + c.msg.Attributes[key] = val +} + +// Keys returns a slice of all keys in the carrier. +func (c PubsubMessageCarrier) Keys() []string { + i := 0 + out := make([]string, len(c.msg.Attributes)) + for k := range c.msg.Attributes { + out[i] = k + i++ + } + return out +} + +func getPublisherAttributes(topic, key string) []trace.SpanStartOption { + opts := []trace.SpanStartOption{ + trace.WithAttributes( + semconv.MessagingSystemKey.String("pubsub"), + semconv.MessagingDestinationKey.String(topic), + semconv.MessagingDestinationKindTopic, + attribute.String("pubsub.ordering_key", key), + ), + trace.WithSpanKind(trace.SpanKindProducer), + } + return opts +} + +func getSubscriberAttributes(sub string) []trace.SpanStartOption { + opts := []trace.SpanStartOption{ + trace.WithAttributes( + semconv.MessagingSystemKey.String("pubsub"), + semconv.MessagingDestinationKey.String(sub), + ), + trace.WithSpanKind(trace.SpanKindConsumer), + } + return opts +} From 44fc62abbb49e886d7353f222932b065fa87ea88 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 26 Oct 2021 16:44:25 -0700 Subject: [PATCH 04/17] add links and proper handling for batch receive --- pubsub/subscription.go | 12 ++++++++---- pubsub/topic.go | 2 -- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index aa3e7fdce9cb..7ef10351782c 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -959,9 +959,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes default: } for i, msg := range msgs { - ctx2 = otel.GetTextMapPropagator().Extract(ctx2, NewPubsubMessageCarrier(msg)) ctx2, receiveSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s receive", s.String())) - defer receiveSpan.End() msg := msg // TODO(jba): call acquire closer to when the message is allocated. @@ -989,14 +987,20 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // TODO(deklerk): Can we have a generic handler at the // constructor level? if err := sched.Add(key, msg, func(msg interface{}) { - ctx2, consumeSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s process", s.String())) + lctx := otel.GetTextMapPropagator().Extract(ctx2, NewPubsubMessageCarrier(msg.(*Message))) + link := trace.LinkFromContext(lctx) + opts := []trace.SpanStartOption{ + trace.WithLinks(link), + } + ctx2, processSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s process", s.String()), opts...) defer wg.Done() f(ctx2, msg.(*Message)) - consumeSpan.End() + processSpan.End() }); err != nil { wg.Done() return err } + receiveSpan.End() } } }) diff --git a/pubsub/topic.go b/pubsub/topic.go index c720f4111a5d..29ec086bcc71 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -670,8 +670,6 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) } bm.msg = nil // release bm.msg for GC _, span := t.tracer.Start(bm.ctx, "publish message bundle") - span.SetAttributes(semconv.MessagingSystemKey.String("pubsub"), semconv.MessagingDestinationKey.String(t.String()), - semconv.MessagingDestinationKindTopic) defer span.End() } var res *pb.PublishResponse From d6e8727ac4ee2ad12ba453c1cd9c1ec73ef4639e Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 26 Oct 2021 16:50:35 -0700 Subject: [PATCH 05/17] resolve go.mod merge and run go mod tidy --- pubsub/go.mod | 12 ++++++------ pubsub/go.sum | 27 ++++++++++++++++++--------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/pubsub/go.mod b/pubsub/go.mod index c107b35b49b6..f42e7ce062f3 100644 --- a/pubsub/go.mod +++ b/pubsub/go.mod @@ -3,19 +3,19 @@ module cloud.google.com/go/pubsub go 1.11 require ( - cloud.google.com/go v0.94.1 - cloud.google.com/go/kms v0.1.0 + cloud.google.com/go v0.97.0 + cloud.google.com/go/kms v1.0.0 github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.6 - github.com/googleapis/gax-go/v2 v2.1.0 + github.com/googleapis/gax-go/v2 v2.1.1 go.opencensus.io v0.23.0 go.opentelemetry.io/otel v1.0.0 go.opentelemetry.io/otel/trace v1.0.0 - golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f + golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac - google.golang.org/api v0.57.0 - google.golang.org/genproto v0.0.0-20210921142501-181ce0d877f6 + google.golang.org/api v0.59.0 + google.golang.org/genproto v0.0.0-20211026145609-4688e4c4e024 google.golang.org/grpc v1.40.0 google.golang.org/protobuf v1.27.1 ) diff --git a/pubsub/go.sum b/pubsub/go.sum index 1529f25c5497..28448d1cc1cb 100644 --- a/pubsub/go.sum +++ b/pubsub/go.sum @@ -22,10 +22,10 @@ cloud.google.com/go v0.83.0/go.mod h1:Z7MJUsANfY0pYPdw0lbnivPx4/vhy/e2FEkSkF7vAV cloud.google.com/go v0.84.0/go.mod h1:RazrYuxIK6Kb7YrzzhPoLmCVzl7Sup4NrbKPg8KHSUM= cloud.google.com/go v0.87.0/go.mod h1:TpDYlFy7vuLzZMMZ+B6iRiELaY7z/gJPaqbMx6mlWcY= cloud.google.com/go v0.90.0/go.mod h1:kRX0mNRHe0e2rC6oNakvwQqzyDmg57xJ+SZU1eT2aDQ= -cloud.google.com/go v0.92.2/go.mod h1:8utlLll2EF5XMAV15woO4lSbWQlk8rer9aLOfLh7+YI= cloud.google.com/go v0.93.3/go.mod h1:8utlLll2EF5XMAV15woO4lSbWQlk8rer9aLOfLh7+YI= -cloud.google.com/go v0.94.1 h1:DwuSvDZ1pTYGbXo8yOJevCTr3BoBlE+OVkHAKiYQUXc= cloud.google.com/go v0.94.1/go.mod h1:qAlAugsXlC+JWO+Bke5vCtc9ONxjQT3drlTTnAplMW4= +cloud.google.com/go v0.97.0 h1:3DXvAyifywvq64LfkKaMOmkWPS1CikIQdMe2lY9vxU8= +cloud.google.com/go v0.97.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Udc= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= @@ -34,8 +34,8 @@ cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4g cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= -cloud.google.com/go/kms v0.1.0 h1:VXAb5OzejDcyhFzIDeZ5n5AUdlsFnCyexuascIwWMj0= -cloud.google.com/go/kms v0.1.0/go.mod h1:8Qp8PCAypHg4FdmlyW1QRAv09BGQ9Uzh7JnmIZxPk+c= +cloud.google.com/go/kms v1.0.0 h1:YkIeqPXqTAlwXk3Z2/WG0d6h1tqJQjU354WftjEoP9E= +cloud.google.com/go/kms v1.0.0/go.mod h1:nhUehi+w7zht2XrUfvTRNpxrfayBHqP4lu2NSywui/0= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= @@ -144,8 +144,9 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= -github.com/googleapis/gax-go/v2 v2.1.0 h1:6DWmvNpomjL1+3liNSZbVns3zsYzzCjm6pRBO1tLeso= github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0= +github.com/googleapis/gax-go/v2 v2.1.1 h1:dp3bWCh+PPO1zjRRiCSczJav13sBvG4UhNyVTa1KqdU= +github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0eJc8R6ouapiM= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -279,8 +280,9 @@ golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f h1:Qmd2pbz05z7z6lm0DrgQVVPuBm92jqujBKMHMOlOQEw= golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1 h1:B333XXssMuKQeBwiNODx4TupZy7bf4sxFZnN2ZOcvUE= +golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -337,8 +339,9 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365 h1:6wSTsvPddg9gc/mVEEyk9oOAoxn+bT4Z9q1zx+4RwA4= golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac h1:oN6lz7iLW/YC7un8pq+9bOLyXrprv2+DKfkJY+2LJJw= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -437,8 +440,10 @@ google.golang.org/api v0.50.0/go.mod h1:4bNT5pAuq5ji4SRZm+5QIkjny9JAyVD/3gaSihNe google.golang.org/api v0.51.0/go.mod h1:t4HdrdoNgyN5cbEfm7Lum0lcLDLiise1F8qDKX00sOU= google.golang.org/api v0.54.0/go.mod h1:7C4bFFOvVDGXjfDTAsgGwDgAxRDeQ4X8NvUedIt6z3k= google.golang.org/api v0.55.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE= -google.golang.org/api v0.57.0 h1:4t9zuDlHLcIx0ZEhmXEeFVCRsiOgpgn2QOH9N0MNjPI= +google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE= google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdrMgI= +google.golang.org/api v0.59.0 h1:fPfFO7gttlXYo2ALuD3HxJzh8vaF++4youI0BkFL6GE= +google.golang.org/api v0.59.0/go.mod h1:sT2boj7M9YJxZzgeZqXogmhfmRWDtPzT31xkieUbuZU= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -501,8 +506,12 @@ google.golang.org/genproto v0.0.0-20210821163610-241b8fcbd6c8/go.mod h1:eFjDcFEc google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= -google.golang.org/genproto v0.0.0-20210921142501-181ce0d877f6 h1:2ncG/LajxmrclaZH+ppVi02rQxz4eXYJzGHdFN4Y9UA= +google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210921142501-181ce0d877f6/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211008145708-270636b82663/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211026145609-4688e4c4e024 h1:aePO4E0x+Urj9V5NQHjqOpaNG4oMeHQq0l2ob05z5tI= +google.golang.org/genproto v0.0.0-20211026145609-4688e4c4e024/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= From 9577495de0ad5be4ef7d32d82ebedee9da308ed8 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 26 Oct 2021 17:23:12 -0700 Subject: [PATCH 06/17] update span attributes --- pubsub/subscription.go | 12 ++++++----- pubsub/topic.go | 7 +++--- pubsub/trace.go | 48 ++++++++++++++---------------------------- 3 files changed, 27 insertions(+), 40 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 7ef10351782c..29e2a71288b8 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -28,6 +28,7 @@ import ( "cloud.google.com/go/pubsub/internal/scheduler" gax "github.com/googleapis/gax-go/v2" "go.opentelemetry.io/otel" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" pb "google.golang.org/genproto/googleapis/pubsub/v1" @@ -959,7 +960,8 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes default: } for i, msg := range msgs { - ctx2, receiveSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s receive", s.String())) + opts := getSpanAttributes("", msg, semconv.MessagingOperationReceive) + ctx2, receiveSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s receive", s.String()), opts...) msg := msg // TODO(jba): call acquire closer to when the message is allocated. @@ -987,11 +989,11 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // TODO(deklerk): Can we have a generic handler at the // constructor level? if err := sched.Add(key, msg, func(msg interface{}) { - lctx := otel.GetTextMapPropagator().Extract(ctx2, NewPubsubMessageCarrier(msg.(*Message))) + m := msg.(*Message) + lctx := otel.GetTextMapPropagator().Extract(ctx2, NewPubsubMessageCarrier(m)) link := trace.LinkFromContext(lctx) - opts := []trace.SpanStartOption{ - trace.WithLinks(link), - } + opts := getSpanAttributes("", m, semconv.MessagingOperationProcess) + opts = append(opts, trace.WithLinks(link)) ctx2, processSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s process", s.String()), opts...) defer wg.Done() f(ctx2, msg.(*Message)) diff --git a/pubsub/topic.go b/pubsub/topic.go index 29ec086bcc71..aa24579028b9 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -505,7 +505,7 @@ type PublishResult = ipubsub.PublishResult // will immediately return a PublishResult with an error. func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { var span trace.Span - opts := getPublisherAttributes(t.String(), msg.OrderingKey) + opts := getSpanAttributes(t.String(), msg) ctx, span = t.tracer.Start(ctx, t.String()+" send", opts...) defer span.End() r := ipubsub.NewPublishResult() @@ -668,9 +668,10 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) Attributes: bm.msg.Attributes, OrderingKey: bm.msg.OrderingKey, } - bm.msg = nil // release bm.msg for GC - _, span := t.tracer.Start(bm.ctx, "publish message bundle") + opts := getSpanAttributes(t.String(), bm.msg) + _, span := t.tracer.Start(bm.ctx, "publish message", opts...) defer span.End() + bm.msg = nil // release bm.msg for GC } var res *pb.PublishResponse start := time.Now() diff --git a/pubsub/trace.go b/pubsub/trace.go index b608dc6790af..13293e561f6e 100644 --- a/pubsub/trace.go +++ b/pubsub/trace.go @@ -26,6 +26,8 @@ import ( "go.opentelemetry.io/otel/propagation" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.opentelemetry.io/otel/trace" + pb "google.golang.org/genproto/googleapis/pubsub/v1" + "google.golang.org/protobuf/proto" ) // The following keys are used to tag requests with a specific topic/subscription ID. @@ -241,23 +243,6 @@ func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) { stats.Record(ctx, m.M(n)) } -// func parseSpanContext(sc string) *trace.SpanContextConfig { -// ctx := context.Background() -// // TODO: figure out how to unmarshal json spancontext -> scc -// if err := json.Unmarshal([]byte(sc), spanContext); err != nil { -// panic(err) -// } -// fmt.Printf("got span context: %+v\n", spanContext) - -// return &trace.SpanContextConfig{ -// TraceID: trace.TraceID{}, -// SpanID: spanContext.SpanID(), -// TraceFlags: spanContext.TraceFlags(), -// TraceState: spanContext.TraceState(), -// Remote: spanContext.IsRemote(), -// } -// } - var _ propagation.TextMapCarrier = (*PubsubMessageCarrier)(nil) // PubsubMessageCarrier injects and extracts traces from a pubsub.Message. @@ -291,26 +276,25 @@ func (c PubsubMessageCarrier) Keys() []string { return out } -func getPublisherAttributes(topic, key string) []trace.SpanStartOption { - opts := []trace.SpanStartOption{ +func getSpanAttributes(topic string, msg *Message, opts ...attribute.KeyValue) []trace.SpanStartOption { + // TODO(hongalex): benchmark this to make sure no significant performance degradation + // when calculating proto.Size in receive paths. + msgSize := proto.Size(&pb.PubsubMessage{ + Data: msg.Data, + Attributes: msg.Attributes, + OrderingKey: msg.OrderingKey, + }) + ss := []trace.SpanStartOption{ trace.WithAttributes( semconv.MessagingSystemKey.String("pubsub"), semconv.MessagingDestinationKey.String(topic), semconv.MessagingDestinationKindTopic, - attribute.String("pubsub.ordering_key", key), + semconv.MessagingMessageIDKey.String(msg.ID), + semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize), + attribute.String("pubsub.ordering_key", msg.OrderingKey), ), + trace.WithAttributes(opts...), trace.WithSpanKind(trace.SpanKindProducer), } - return opts -} - -func getSubscriberAttributes(sub string) []trace.SpanStartOption { - opts := []trace.SpanStartOption{ - trace.WithAttributes( - semconv.MessagingSystemKey.String("pubsub"), - semconv.MessagingDestinationKey.String(sub), - ), - trace.WithSpanKind(trace.SpanKindConsumer), - } - return opts + return ss } From 994739cf7b91a19610308fe7e830b6b2fd5cd090 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 9 Nov 2021 15:42:16 -0800 Subject: [PATCH 07/17] use proper trace naming --- pubsub/subscription.go | 2 +- pubsub/topic.go | 2 +- pubsub/trace.go | 2 ++ 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 25e21e243dc1..14b32f0a4f35 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -73,7 +73,7 @@ func newSubscription(c *Client, name string) *Subscription { c: c, name: name, ReceiveSettings: DefaultReceiveSettings, - tracer: otel.Tracer("instrumentation/pubsub/sub"), + tracer: otel.Tracer(defaultTracerName), } } diff --git a/pubsub/topic.go b/pubsub/topic.go index b9412d6a16d2..64785a0a52b7 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -197,7 +197,7 @@ func newTopic(c *Client, name string) *Topic { c: c, name: name, PublishSettings: DefaultPublishSettings, - tracer: otel.Tracer("instreumentation/package/name"), + tracer: otel.Tracer(defaultTracerName), } } diff --git a/pubsub/trace.go b/pubsub/trace.go index 13293e561f6e..97bbb3892557 100644 --- a/pubsub/trace.go +++ b/pubsub/trace.go @@ -243,6 +243,8 @@ func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) { stats.Record(ctx, m.M(n)) } +const defaultTracerName = "cloud.google.com/go/pubsub" + var _ propagation.TextMapCarrier = (*PubsubMessageCarrier)(nil) // PubsubMessageCarrier injects and extracts traces from a pubsub.Message. From b323e77da57229304e8859b16eed2fe84f028dc6 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 7 Dec 2021 16:36:04 -0800 Subject: [PATCH 08/17] use topic name for span names --- pubsub/subscription.go | 36 ++++++++++++++++++++++++------------ pubsub/topic.go | 9 +++++++-- 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 14b32f0a4f35..585870eba202 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -55,7 +55,9 @@ type Subscription struct { enableOrdering bool - tracer trace.Tracer + // topicName is for creating spans for OpenTelemetry tracing. + topicName string + tracer trace.Tracer } // Subscription creates a reference to a subscription. @@ -851,7 +853,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes s.mu.Unlock() defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() - s.checkOrdering() + s.checkSubConfig() maxCount := s.ReceiveSettings.MaxOutstandingMessages if maxCount == 0 { @@ -985,7 +987,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes } for i, msg := range msgs { opts := getSpanAttributes("", msg, semconv.MessagingOperationReceive) - ctx2, receiveSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s receive", s.String()), opts...) + ctx2, receiveSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s receive", s.topicName), opts...) msg := msg // TODO(jba): call acquire closer to when the message is allocated. @@ -1014,19 +1016,26 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // constructor level? if err := sched.Add(key, msg, func(msg interface{}) { m := msg.(*Message) - lctx := otel.GetTextMapPropagator().Extract(ctx2, NewPubsubMessageCarrier(m)) - link := trace.LinkFromContext(lctx) - opts := getSpanAttributes("", m, semconv.MessagingOperationProcess) - opts = append(opts, trace.WithLinks(link)) - ctx2, processSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s process", s.String()), opts...) + if m.Attributes != nil && m.Attributes["googclient_traceparent"] != "" { + lctx := otel.GetTextMapPropagator().Extract(ctx2, NewPubsubMessageCarrier(m)) + link := trace.LinkFromContext(lctx) + opts := getSpanAttributes("", m, semconv.MessagingOperationProcess) + opts = append(opts, trace.WithLinks(link)) + _, processSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s process", s.topicName), opts...) + // End spans to ack handler doneFunc, which also handles flow control release. + old := ackh.doneFunc + ackh.doneFunc = func(ackID string, ack bool, receiveTime time.Time) { + defer processSpan.End() + defer receiveSpan.End() + old(ackID, ack, receiveTime) + } + } defer wg.Done() f(ctx2, msg.(*Message)) - processSpan.End() }); err != nil { wg.Done() return err } - receiveSpan.End() } } }) @@ -1050,18 +1059,21 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes return group.Wait() } -// checkOrdering calls Config to check theEnableMessageOrdering field. +// checkSubConfig calls Config to check the subscription config fields. +// For ordering, we check the EnableMessageOrdering field. +// For OpenTelemetry, we check the topic name. // If this call fails (e.g. because the service account doesn't have // the roles/viewer or roles/pubsub.viewer role) we will assume // EnableMessageOrdering to be true. // See: https://github.com/googleapis/google-cloud-go/issues/3884 -func (s *Subscription) checkOrdering() { +func (s *Subscription) checkSubConfig() { ctx := context.Background() cfg, err := s.Config(ctx) if err != nil { s.enableOrdering = true } else { s.enableOrdering = cfg.EnableMessageOrdering + s.topicName = cfg.Topic.name } } diff --git a/pubsub/topic.go b/pubsub/topic.go index 64785a0a52b7..3fff6d4eda81 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -531,7 +531,8 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { var span trace.Span opts := getSpanAttributes(t.String(), msg) ctx, span = t.tracer.Start(ctx, t.String()+" send", opts...) - defer span.End() + _, span2 := t.tracer.Start(ctx, t.String()+" add to batch", opts...) + defer span2.End() r := ipubsub.NewPublishResult() if !t.EnableMessageOrdering && msg.OrderingKey != "" { ipubsub.SetPublishResult(r, "", errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering")) @@ -573,6 +574,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { msg: msg, res: r, size: msgSize, + span: span, } if err := t.scheduler.Add(msg.OrderingKey, bmsg, msgSize); err != nil { t.scheduler.Pause(msg.OrderingKey) @@ -608,6 +610,7 @@ type bundledMessage struct { msg *Message res *PublishResult size int + span trace.Span } func (t *Topic) initBundler() { @@ -693,8 +696,9 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) OrderingKey: bm.msg.OrderingKey, } opts := getSpanAttributes(t.String(), bm.msg) - _, span := t.tracer.Start(bm.ctx, "publish message", opts...) + _, span := t.tracer.Start(bm.ctx, t.String()+" publish RPC", opts...) defer span.End() + defer bm.span.End() bm.msg = nil // release bm.msg for GC } var res *pb.PublishResponse @@ -724,6 +728,7 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) ipubsub.SetPublishResult(bm.res, "", err) } else { ipubsub.SetPublishResult(bm.res, res.MessageIds[i], nil) + bm.span.SetAttributes(semconv.MessagingMessageIDKey.String(res.MessageIds[i])) } } } From 6d4b7bee6091d63a82f0e372e2b1b6bbb82c4d25 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 7 Dec 2021 16:36:33 -0800 Subject: [PATCH 09/17] prepend googclient to attributes in carrier --- pubsub/trace.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pubsub/trace.go b/pubsub/trace.go index 97bbb3892557..0dc8c4f4cb99 100644 --- a/pubsub/trace.go +++ b/pubsub/trace.go @@ -252,19 +252,19 @@ type PubsubMessageCarrier struct { msg *Message } -// NewPubsubMessageCarrier creates a new PubsubMessageCarrier.PubsubMessageCarrier. +// NewPubsubMessageCarrier creates a new PubsubMessageCarrier. func NewPubsubMessageCarrier(msg *Message) PubsubMessageCarrier { return PubsubMessageCarrier{msg: msg} } // Get retrieves a single value for a given key. func (c PubsubMessageCarrier) Get(key string) string { - return c.msg.Attributes[key] + return c.msg.Attributes["googclient_"+key] } -// Set sets a header. +// Set sets an attribute. func (c PubsubMessageCarrier) Set(key, val string) { - c.msg.Attributes[key] = val + c.msg.Attributes["googclient_"+key] = val } // Keys returns a slice of all keys in the carrier. From e99b1a7d0defa25539d1af8e4df9c60932e816fa Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Fri, 7 Jan 2022 17:13:40 -0800 Subject: [PATCH 10/17] fix span attributes --- pubsub/subscription.go | 4 ++-- pubsub/topic.go | 43 ++++++++++++++++++++++++------------------ pubsub/trace.go | 27 +++++++++++++++++++++++--- 3 files changed, 51 insertions(+), 23 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 585870eba202..111a48cdd15d 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -986,7 +986,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes default: } for i, msg := range msgs { - opts := getSpanAttributes("", msg, semconv.MessagingOperationReceive) + opts := getSubSpanAttributes("", msg, semconv.MessagingOperationReceive) ctx2, receiveSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s receive", s.topicName), opts...) msg := msg @@ -1019,7 +1019,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes if m.Attributes != nil && m.Attributes["googclient_traceparent"] != "" { lctx := otel.GetTextMapPropagator().Extract(ctx2, NewPubsubMessageCarrier(m)) link := trace.LinkFromContext(lctx) - opts := getSpanAttributes("", m, semconv.MessagingOperationProcess) + opts := getSubSpanAttributes("", m, semconv.MessagingOperationProcess) opts = append(opts, trace.WithLinks(link)) _, processSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s process", s.topicName), opts...) // End spans to ack handler doneFunc, which also handles flow control release. diff --git a/pubsub/topic.go b/pubsub/topic.go index 3fff6d4eda81..41d5a004a81c 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -528,10 +528,9 @@ type PublishResult = ipubsub.PublishResult // need to be stopped by calling t.Stop(). Once stopped, future calls to Publish // will immediately return a PublishResult with an error. func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { - var span trace.Span - opts := getSpanAttributes(t.String(), msg) - ctx, span = t.tracer.Start(ctx, t.String()+" send", opts...) - _, span2 := t.tracer.Start(ctx, t.String()+" add to batch", opts...) + opts := getPublishSpanAttributes(t.String(), msg) + ctx, span := t.tracer.Start(ctx, t.String()+" send", opts...) + ctx, span2 := t.tracer.Start(ctx, t.String()+" add to batch", opts...) defer span2.End() r := ipubsub.NewPublishResult() if !t.EnableMessageOrdering && msg.OrderingKey != "" { @@ -569,12 +568,15 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { ipubsub.SetPublishResult(r, "", err) return r } + _, span3 := t.tracer.Start(ctx, t.String()+" schedule batch") + bmsg := &bundledMessage{ - ctx: ctx, - msg: msg, - res: r, - size: msgSize, - span: span, + ctx: ctx, + msg: msg, + res: r, + size: msgSize, + pubSpan: span, + batchSpan: span3, } if err := t.scheduler.Add(msg.OrderingKey, bmsg, msgSize); err != nil { t.scheduler.Pause(msg.OrderingKey) @@ -606,11 +608,12 @@ func (t *Topic) Flush() { } type bundledMessage struct { - ctx context.Context - msg *Message - res *PublishResult - size int - span trace.Span + ctx context.Context + msg *Message + res *PublishResult + size int + pubSpan trace.Span + batchSpan trace.Span } func (t *Topic) initBundler() { @@ -645,7 +648,11 @@ func (t *Topic) initBundler() { ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } - t.publishMessageBundle(ctx, bundle.([]*bundledMessage)) + bmsgs := bundle.([]*bundledMessage) + for _, m := range bmsgs { + m.batchSpan.End() + } + t.publishMessageBundle(ctx, bmsgs) }) t.scheduler.DelayThreshold = t.PublishSettings.DelayThreshold t.scheduler.BundleCountThreshold = t.PublishSettings.CountThreshold @@ -695,10 +702,10 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) Attributes: bm.msg.Attributes, OrderingKey: bm.msg.OrderingKey, } - opts := getSpanAttributes(t.String(), bm.msg) + opts := getPublishSpanAttributes(t.String(), bm.msg) _, span := t.tracer.Start(bm.ctx, t.String()+" publish RPC", opts...) defer span.End() - defer bm.span.End() + defer bm.pubSpan.End() bm.msg = nil // release bm.msg for GC } var res *pb.PublishResponse @@ -728,7 +735,7 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) ipubsub.SetPublishResult(bm.res, "", err) } else { ipubsub.SetPublishResult(bm.res, res.MessageIds[i], nil) - bm.span.SetAttributes(semconv.MessagingMessageIDKey.String(res.MessageIds[i])) + bm.pubSpan.SetAttributes(semconv.MessagingMessageIDKey.String(res.MessageIds[i])) } } } diff --git a/pubsub/trace.go b/pubsub/trace.go index 0dc8c4f4cb99..74222d070c5e 100644 --- a/pubsub/trace.go +++ b/pubsub/trace.go @@ -259,12 +259,12 @@ func NewPubsubMessageCarrier(msg *Message) PubsubMessageCarrier { // Get retrieves a single value for a given key. func (c PubsubMessageCarrier) Get(key string) string { - return c.msg.Attributes["googclient_"+key] + return c.msg.Attributes["gogclient_"+key] } // Set sets an attribute. func (c PubsubMessageCarrier) Set(key, val string) { - c.msg.Attributes["googclient_"+key] = val + c.msg.Attributes["gogclient_"+key] = val } // Keys returns a slice of all keys in the carrier. @@ -278,7 +278,7 @@ func (c PubsubMessageCarrier) Keys() []string { return out } -func getSpanAttributes(topic string, msg *Message, opts ...attribute.KeyValue) []trace.SpanStartOption { +func getPublishSpanAttributes(topic string, msg *Message, opts ...attribute.KeyValue) []trace.SpanStartOption { // TODO(hongalex): benchmark this to make sure no significant performance degradation // when calculating proto.Size in receive paths. msgSize := proto.Size(&pb.PubsubMessage{ @@ -300,3 +300,24 @@ func getSpanAttributes(topic string, msg *Message, opts ...attribute.KeyValue) [ } return ss } + +func getSubSpanAttributes(topic string, msg *Message, opts ...attribute.KeyValue) []trace.SpanStartOption { + msgSize := proto.Size(&pb.PubsubMessage{ + Data: msg.Data, + Attributes: msg.Attributes, + OrderingKey: msg.OrderingKey, + }) + ss := []trace.SpanStartOption{ + trace.WithAttributes( + semconv.MessagingSystemKey.String("pubsub"), + semconv.MessagingDestinationKey.String(topic), + semconv.MessagingDestinationKindTopic, + semconv.MessagingMessageIDKey.String(msg.ID), + semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize), + attribute.String("pubsub.ordering_key", msg.OrderingKey), + ), + trace.WithAttributes(opts...), + trace.WithSpanKind(trace.SpanKindConsumer), + } + return ss +} From f3e7a0a84f804f1ad9b2156f8a23960818b9b15f Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 1 Mar 2022 17:14:51 -0800 Subject: [PATCH 11/17] resolve merge --- pubsub/subscription.go | 8 -------- pubsub/topic.go | 12 +++++++----- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 2213467a0804..111a48cdd15d 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -853,11 +853,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes s.mu.Unlock() defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() -<<<<<<< HEAD s.checkSubConfig() -======= - s.checkOrdering(ctx) ->>>>>>> 102e459ce6a6305171087f725d5a46e1106cbdbc maxCount := s.ReceiveSettings.MaxOutstandingMessages if maxCount == 0 { @@ -1070,12 +1066,8 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // the roles/viewer or roles/pubsub.viewer role) we will assume // EnableMessageOrdering to be true. // See: https://github.com/googleapis/google-cloud-go/issues/3884 -<<<<<<< HEAD func (s *Subscription) checkSubConfig() { ctx := context.Background() -======= -func (s *Subscription) checkOrdering(ctx context.Context) { ->>>>>>> 102e459ce6a6305171087f725d5a46e1106cbdbc cfg, err := s.Config(ctx) if err != nil { s.enableOrdering = true diff --git a/pubsub/topic.go b/pubsub/topic.go index 41d5a004a81c..7f7b3eb6eaf0 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -608,11 +608,13 @@ func (t *Topic) Flush() { } type bundledMessage struct { - ctx context.Context - msg *Message - res *PublishResult - size int - pubSpan trace.Span + ctx context.Context + msg *Message + res *PublishResult + size int + // pubSpan is the entire publish span (from user calling Publish to the publish RPC resolving). + pubSpan trace.Span + // batchSpan tracks how long a message is waiting to be published. batchSpan trace.Span } From 516663e2670f05da71ebb2c47575fc9b91357863 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Thu, 3 Mar 2022 01:05:39 -0800 Subject: [PATCH 12/17] fix ordering key message attribute --- pubsub/trace.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pubsub/trace.go b/pubsub/trace.go index 74222d070c5e..15d2d8258bdb 100644 --- a/pubsub/trace.go +++ b/pubsub/trace.go @@ -278,6 +278,8 @@ func (c PubsubMessageCarrier) Keys() []string { return out } +const orderingAttribute = "messaging.pubsub.ordering_key" + func getPublishSpanAttributes(topic string, msg *Message, opts ...attribute.KeyValue) []trace.SpanStartOption { // TODO(hongalex): benchmark this to make sure no significant performance degradation // when calculating proto.Size in receive paths. @@ -293,7 +295,7 @@ func getPublishSpanAttributes(topic string, msg *Message, opts ...attribute.KeyV semconv.MessagingDestinationKindTopic, semconv.MessagingMessageIDKey.String(msg.ID), semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize), - attribute.String("pubsub.ordering_key", msg.OrderingKey), + attribute.String(orderingAttribute, msg.OrderingKey), ), trace.WithAttributes(opts...), trace.WithSpanKind(trace.SpanKindProducer), @@ -314,7 +316,7 @@ func getSubSpanAttributes(topic string, msg *Message, opts ...attribute.KeyValue semconv.MessagingDestinationKindTopic, semconv.MessagingMessageIDKey.String(msg.ID), semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize), - attribute.String("pubsub.ordering_key", msg.OrderingKey), + attribute.String(orderingAttribute, msg.OrderingKey), ), trace.WithAttributes(opts...), trace.WithSpanKind(trace.SpanKindConsumer), From d0d24e2f6024640ffd261edc33d6d974efc176fe Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Thu, 24 Mar 2022 13:35:57 -0700 Subject: [PATCH 13/17] add initial tests --- pubsub/go.mod | 5 +-- pubsub/go.sum | 15 ++++++--- pubsub/topic.go | 2 +- pubsub/trace_test.go | 80 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 95 insertions(+), 7 deletions(-) create mode 100644 pubsub/trace_test.go diff --git a/pubsub/go.mod b/pubsub/go.mod index 7acad26b84d0..eb887bbcda30 100644 --- a/pubsub/go.mod +++ b/pubsub/go.mod @@ -10,8 +10,9 @@ require ( github.com/google/go-cmp v0.5.7 github.com/googleapis/gax-go/v2 v2.1.1 go.opencensus.io v0.23.0 - go.opentelemetry.io/otel v1.0.0 - go.opentelemetry.io/otel/trace v1.0.0 + go.opentelemetry.io/otel v1.4.1 + go.opentelemetry.io/otel/sdk v1.4.1 + go.opentelemetry.io/otel/trace v1.4.1 golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 diff --git a/pubsub/go.sum b/pubsub/go.sum index df42fdfbfcb4..3d4a3a0bb779 100644 --- a/pubsub/go.sum +++ b/pubsub/go.sum @@ -88,6 +88,10 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/go-logr/logr v1.2.2 h1:ahHml/yUpnlb96Rp8HCvtYVPY8ZYpxq3g7UYchIYwbs= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -200,10 +204,12 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= -go.opentelemetry.io/otel v1.0.0 h1:qTTn6x71GVBvoafHK/yaRUmFzI4LcONZD0/kXxl5PHI= -go.opentelemetry.io/otel v1.0.0/go.mod h1:AjRVh9A5/5DE7S+mZtTR6t8vpKKryam+0lREnfmS4cg= -go.opentelemetry.io/otel/trace v1.0.0 h1:TSBr8GTEtKevYMG/2d21M989r5WJYVimhTHBKVEZuh4= -go.opentelemetry.io/otel/trace v1.0.0/go.mod h1:PXTWqayeFUlJV1YDNhsJYB184+IvAH814St6o6ajzIs= +go.opentelemetry.io/otel v1.4.1 h1:QbINgGDDcoQUoMJa2mMaWno49lja9sHwp6aoa2n3a4g= +go.opentelemetry.io/otel v1.4.1/go.mod h1:StM6F/0fSwpd8dKWDCdRr7uRvEPYdW0hBSlbdTiUde4= +go.opentelemetry.io/otel/sdk v1.4.1 h1:J7EaW71E0v87qflB4cDolaqq3AcujGrtyIPGQoZOB0Y= +go.opentelemetry.io/otel/sdk v1.4.1/go.mod h1:NBwHDgDIBYjwK2WNu1OPgsIc2IJzmBXNnvIJxJc8BpE= +go.opentelemetry.io/otel/trace v1.4.1 h1:O+16qcdTrT7zxv2J6GejTPFinSwA++cYerC5iSiF8EQ= +go.opentelemetry.io/otel/trace v1.4.1/go.mod h1:iYEVbroFCNut9QkwEczV9vMRPHNKSSwYZjulEtsmhFc= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -348,6 +354,7 @@ golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/pubsub/topic.go b/pubsub/topic.go index 6bd2b247e557..b7e80c84d43f 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -568,7 +568,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { ipubsub.SetPublishResult(r, "", err) return r } - _, span3 := t.tracer.Start(ctx, t.String()+" schedule batch") + _, span3 := t.tracer.Start(ctx, t.String()+" waiting in batch") bmsg := &bundledMessage{ ctx: ctx, diff --git a/pubsub/trace_test.go b/pubsub/trace_test.go new file mode 100644 index 000000000000..2e6513640165 --- /dev/null +++ b/pubsub/trace_test.go @@ -0,0 +1,80 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pubsub + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +func TestPublishSpan(t *testing.T) { + ctx := context.Background() + c, srv := newFake(t) + defer c.Close() + defer srv.Close() + + spanRecorder := tracetest.NewSpanRecorder() + provider := trace.NewTracerProvider(trace.WithSpanProcessor(spanRecorder)) + otel.SetTracerProvider(provider) + + topic := c.Topic("t") + r := topic.Publish(ctx, &Message{ + Data: []byte("test"), + }) + r.Get(ctx) + defer topic.Stop() + + spans := spanRecorder.Ended() + for i, span := range spans { + + // Check span + assert.True(t, span.SpanContext().IsValid()) + // assert.Equal(t, "pubsub.topic", span.Name()) + fmt.Printf("got span %d: %+v\n", i, span) + } +} + +func TestSubscribeSpan(t *testing.T) { + ctx := context.Background() + c, srv := newFake(t) + defer c.Close() + defer srv.Close() + + topic := c.Topic("t") + r := topic.Publish(ctx, &Message{ + Data: []byte("test"), + }) + r.Get(ctx) + defer topic.Stop() + + spanRecorder := tracetest.NewSpanRecorder() + provider := trace.NewTracerProvider(trace.WithSpanProcessor(spanRecorder)) + otel.SetTracerProvider(provider) + + spans := spanRecorder.Ended() + for i, span := range spans { + + // Check span + assert.True(t, span.SpanContext().IsValid()) + // assert.Equal(t, "pubsub.topic", span.Name()) + fmt.Printf("got span %d: %+v\n", i, span) + } +} From db3bcaddb8fe4c7a0281abb06d2356e06516cd94 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Wed, 6 Apr 2022 10:20:39 -0700 Subject: [PATCH 14/17] add additional spans for publish/receive --- pubsub/subscription.go | 32 +++++++++++++++++++------------- pubsub/topic.go | 2 +- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 111a48cdd15d..6f3b281d49da 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -939,6 +939,9 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes defer wg.Wait() defer cancel2() for { + opts := getSubSpanAttributes(s.topicName, &Message{}, semconv.MessagingOperationReceive) + ctx2, receiveSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s receive", s.topicName), opts...) + var maxToPull int32 // maximum number of messages to pull if po.synchronous { if po.maxPrefetch < 0 { @@ -968,6 +971,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes return nil default: } + msgs, err := iter.receive(maxToPull) if err == io.EOF { return nil @@ -975,6 +979,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes if err != nil { return err } + receiveSpan.End() // If context is done and messages have been pulled, // nack them. select { @@ -985,11 +990,12 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes return nil default: } - for i, msg := range msgs { - opts := getSubSpanAttributes("", msg, semconv.MessagingOperationReceive) - ctx2, receiveSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s receive", s.topicName), opts...) + for i, msg := range msgs { msg := msg + fcSpanName := fmt.Sprintf("%s waiting for subscriber flow control", s.topicName) + ctx2, fcSpan := s.tracer.Start(ctx2, fcSpanName, opts...) + defer fcSpan.End() // TODO(jba): call acquire closer to when the message is allocated. if err := fc.acquire(ctx, len(msg.Data)); err != nil { // TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done. @@ -999,6 +1005,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // Return nil if the context is done, not err. return nil } + fcSpan.End() ackh, _ := msgAckHandler(msg) old := ackh.doneFunc msgLen := len(msg.Data) @@ -1016,19 +1023,18 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // constructor level? if err := sched.Add(key, msg, func(msg interface{}) { m := msg.(*Message) - if m.Attributes != nil && m.Attributes["googclient_traceparent"] != "" { + opts := getSubSpanAttributes("", m, semconv.MessagingOperationProcess) + if m.Attributes != nil && m.Attributes["gogclient_traceparent"] != "" { lctx := otel.GetTextMapPropagator().Extract(ctx2, NewPubsubMessageCarrier(m)) link := trace.LinkFromContext(lctx) - opts := getSubSpanAttributes("", m, semconv.MessagingOperationProcess) opts = append(opts, trace.WithLinks(link)) - _, processSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s process", s.topicName), opts...) - // End spans to ack handler doneFunc, which also handles flow control release. - old := ackh.doneFunc - ackh.doneFunc = func(ackID string, ack bool, receiveTime time.Time) { - defer processSpan.End() - defer receiveSpan.End() - old(ackID, ack, receiveTime) - } + } + _, processSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s process", s.topicName), opts...) + // End spans to ack handler doneFunc, which also handles flow control release. + old := ackh.doneFunc + ackh.doneFunc = func(ackID string, ack bool, receiveTime time.Time) { + defer processSpan.End() + old(ackID, ack, receiveTime) } defer wg.Done() f(ctx2, msg.(*Message)) diff --git a/pubsub/topic.go b/pubsub/topic.go index b7e80c84d43f..3301bbc4cf7a 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -530,7 +530,7 @@ type PublishResult = ipubsub.PublishResult func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { opts := getPublishSpanAttributes(t.String(), msg) ctx, span := t.tracer.Start(ctx, t.String()+" send", opts...) - ctx, span2 := t.tracer.Start(ctx, t.String()+" add to batch", opts...) + ctx, span2 := t.tracer.Start(ctx, t.String()+" waiting for publisher flow control", opts...) defer span2.End() r := ipubsub.NewPublishResult() if !t.EnableMessageOrdering && msg.OrderingKey != "" { From e669b5cfd0b03bc7d54585cd9fa96da547786d76 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 12 Apr 2022 17:50:56 -0700 Subject: [PATCH 15/17] capture errors in spans and add additional attr --- pubsub/subscription.go | 8 +++++++- pubsub/topic.go | 10 ++++++++++ pubsub/trace.go | 4 ++-- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 6f3b281d49da..dfa859c0ed01 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -28,6 +28,7 @@ import ( "cloud.google.com/go/pubsub/internal/scheduler" gax "github.com/googleapis/gax-go/v2" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" @@ -1024,7 +1025,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes if err := sched.Add(key, msg, func(msg interface{}) { m := msg.(*Message) opts := getSubSpanAttributes("", m, semconv.MessagingOperationProcess) - if m.Attributes != nil && m.Attributes["gogclient_traceparent"] != "" { + if m.Attributes != nil && m.Attributes["googclient_traceparent"] != "" { lctx := otel.GetTextMapPropagator().Extract(ctx2, NewPubsubMessageCarrier(m)) link := trace.LinkFromContext(lctx) opts = append(opts, trace.WithLinks(link)) @@ -1033,6 +1034,11 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // End spans to ack handler doneFunc, which also handles flow control release. old := ackh.doneFunc ackh.doneFunc = func(ackID string, ack bool, receiveTime time.Time) { + if ack { + processSpan.SetAttributes(attribute.String("result", "ack")) + } else { + processSpan.SetAttributes(attribute.String("result", "nack")) + } defer processSpan.End() old(ackID, ack, receiveTime) } diff --git a/pubsub/topic.go b/pubsub/topic.go index 3301bbc4cf7a..8909c22b3425 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -43,6 +43,8 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" + + otelcodes "go.opentelemetry.io/otel/codes" ) const ( @@ -560,12 +562,16 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { // TODO(aboulhosn) [from bcmills] consider changing the semantics of bundler to perform this logic so we don't have to do it here if t.stopped { ipubsub.SetPublishResult(r, "", errTopicStopped) + span.RecordError(errTopicStopped) + span.SetStatus(otelcodes.Error, errTopicStopped.Error()) return r } if err := t.flowController.acquire(ctx, msgSize); err != nil { t.scheduler.Pause(msg.OrderingKey) ipubsub.SetPublishResult(r, "", err) + span.RecordError(errTopicStopped) + span.SetStatus(otelcodes.Error, errTopicStopped.Error()) return r } _, span3 := t.tracer.Start(ctx, t.String()+" waiting in batch") @@ -581,6 +587,8 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { if err := t.scheduler.Add(msg.OrderingKey, bmsg, msgSize); err != nil { t.scheduler.Pause(msg.OrderingKey) ipubsub.SetPublishResult(r, "", err) + span.RecordError(errTopicStopped) + span.SetStatus(otelcodes.Error, errTopicStopped.Error()) } return r } @@ -744,6 +752,8 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) t.flowController.release(ctx, bm.size) if err != nil { ipubsub.SetPublishResult(bm.res, "", err) + bm.pubSpan.RecordError(err) + bm.pubSpan.SetStatus(otelcodes.Error, err.Error()) } else { ipubsub.SetPublishResult(bm.res, res.MessageIds[i], nil) bm.pubSpan.SetAttributes(semconv.MessagingMessageIDKey.String(res.MessageIds[i])) diff --git a/pubsub/trace.go b/pubsub/trace.go index 15d2d8258bdb..b822d548a3d7 100644 --- a/pubsub/trace.go +++ b/pubsub/trace.go @@ -259,12 +259,12 @@ func NewPubsubMessageCarrier(msg *Message) PubsubMessageCarrier { // Get retrieves a single value for a given key. func (c PubsubMessageCarrier) Get(key string) string { - return c.msg.Attributes["gogclient_"+key] + return c.msg.Attributes["googclient_"+key] } // Set sets an attribute. func (c PubsubMessageCarrier) Set(key, val string) { - c.msg.Attributes["gogclient_"+key] = val + c.msg.Attributes["googclient_"+key] = val } // Keys returns a slice of all keys in the carrier. From 17767dee6392d0f7b3ce3590440c3890ea94b97e Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Wed, 13 Apr 2022 16:21:41 -0700 Subject: [PATCH 16/17] make publish RPC batched rather than individual --- pubsub/subscription.go | 2 +- pubsub/topic.go | 33 ++++++++++++++++++++++----------- pubsub/trace.go | 4 ++-- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index dfa859c0ed01..f8f4c1138797 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -1025,7 +1025,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes if err := sched.Add(key, msg, func(msg interface{}) { m := msg.(*Message) opts := getSubSpanAttributes("", m, semconv.MessagingOperationProcess) - if m.Attributes != nil && m.Attributes["googclient_traceparent"] != "" { + if m.Attributes != nil && m.Attributes["gogclient_traceparent"] != "" { lctx := otel.GetTextMapPropagator().Extract(ctx2, NewPubsubMessageCarrier(m)) link := trace.LinkFromContext(lctx) opts = append(opts, trace.WithLinks(link)) diff --git a/pubsub/topic.go b/pubsub/topic.go index 8909c22b3425..cb8bcc3d37ea 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -530,9 +530,11 @@ type PublishResult = ipubsub.PublishResult // need to be stopped by calling t.Stop(). Once stopped, future calls to Publish // will immediately return a PublishResult with an error. func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { + otelctx := ctx opts := getPublishSpanAttributes(t.String(), msg) ctx, span := t.tracer.Start(ctx, t.String()+" send", opts...) - ctx, span2 := t.tracer.Start(ctx, t.String()+" waiting for publisher flow control", opts...) + // Discard this span's context since it is a child span. We only care about the main "send" span. + _, span2 := t.tracer.Start(ctx, t.String()+" waiting for publisher flow control", opts...) defer span2.End() r := ipubsub.NewPublishResult() if !t.EnableMessageOrdering && msg.OrderingKey != "" { @@ -556,7 +558,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { }) span.SetAttributes(semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize)) - t.initBundler() + t.initBundler(otelctx) t.mu.RLock() defer t.mu.RUnlock() // TODO(aboulhosn) [from bcmills] consider changing the semantics of bundler to perform this logic so we don't have to do it here @@ -622,11 +624,13 @@ type bundledMessage struct { size int // pubSpan is the entire publish span (from user calling Publish to the publish RPC resolving). pubSpan trace.Span - // batchSpan tracks how long a message is waiting to be published. + // batchSpan traces the message batching operation in publish scheduler. batchSpan trace.Span } -func (t *Topic) initBundler() { +// The context passed into this method is only for keeping track of opentelemetry traces. +// It is not used for cancellation. +func (t *Topic) initBundler(otelctx context.Context) { t.mu.RLock() noop := t.stopped || t.scheduler != nil t.mu.RUnlock() @@ -652,17 +656,17 @@ func (t *Topic) initBundler() { t.scheduler = scheduler.NewPublishScheduler(workers, func(bundle interface{}) { // TODO(jba): use a context detached from the one passed to NewClient. - ctx := context.TODO() + ctx2 := context.TODO() if timeout != 0 { var cancel func() - ctx, cancel = context.WithTimeout(ctx, timeout) + ctx2, cancel = context.WithTimeout(ctx2, timeout) defer cancel() } bmsgs := bundle.([]*bundledMessage) for _, m := range bmsgs { m.batchSpan.End() } - t.publishMessageBundle(ctx, bmsgs) + t.publishMessageBundle(otelctx, ctx2, bmsgs) }) t.scheduler.DelayThreshold = t.PublishSettings.DelayThreshold t.scheduler.BundleCountThreshold = t.PublishSettings.CountThreshold @@ -698,13 +702,15 @@ func (t *Topic) initBundler() { t.scheduler.BundleByteLimit = MaxPublishRequestBytes - calcFieldSizeString(t.name) - 5 } -func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) { +func (t *Topic) publishMessageBundle(otelctx, ctx context.Context, bms []*bundledMessage) { ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name)) if err != nil { log.Printf("pubsub: cannot create context with tag in publishMessageBundle: %v", err) } pbMsgs := make([]*pb.PubsubMessage, len(bms)) var orderingKey string + + var ll []trace.Link for i, bm := range bms { orderingKey = bm.msg.OrderingKey pbMsgs[i] = &pb.PubsubMessage{ @@ -712,12 +718,17 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) Attributes: bm.msg.Attributes, OrderingKey: bm.msg.OrderingKey, } - opts := getPublishSpanAttributes(t.String(), bm.msg) - _, span := t.tracer.Start(bm.ctx, t.String()+" publish RPC", opts...) - defer span.End() + l := trace.LinkFromContext(bm.ctx) + ll = append(ll, l) defer bm.pubSpan.End() bm.msg = nil // release bm.msg for GC } + + opts := getPublishSpanAttributes(t.String(), &Message{}) + opts = append(opts, trace.WithLinks(ll...)) + _, span := t.tracer.Start(otelctx, t.String()+" publish RPC", opts...) + defer span.End() + var res *pb.PublishResponse start := time.Now() if orderingKey != "" && t.scheduler.IsPaused(orderingKey) { diff --git a/pubsub/trace.go b/pubsub/trace.go index b822d548a3d7..15d2d8258bdb 100644 --- a/pubsub/trace.go +++ b/pubsub/trace.go @@ -259,12 +259,12 @@ func NewPubsubMessageCarrier(msg *Message) PubsubMessageCarrier { // Get retrieves a single value for a given key. func (c PubsubMessageCarrier) Get(key string) string { - return c.msg.Attributes["googclient_"+key] + return c.msg.Attributes["gogclient_"+key] } // Set sets an attribute. func (c PubsubMessageCarrier) Set(key, val string) { - c.msg.Attributes["googclient_"+key] = val + c.msg.Attributes["gogclient_"+key] = val } // Keys returns a slice of all keys in the carrier. From 0ffbdad9b40ae5e7f5d056b9c9a88dc518af0557 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 18 Apr 2022 11:42:55 -0700 Subject: [PATCH 17/17] switch subspans to events --- pubsub/subscription.go | 54 ++++++++++++++++++++-------------------- pubsub/topic.go | 56 +++++++++++++++++++++--------------------- pubsub/trace_test.go | 5 ++-- 3 files changed, 58 insertions(+), 57 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index f8f4c1138797..25b00141833a 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -29,6 +29,7 @@ import ( gax "github.com/googleapis/gax-go/v2" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + otelcodes "go.opentelemetry.io/otel/codes" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" @@ -941,7 +942,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes defer cancel2() for { opts := getSubSpanAttributes(s.topicName, &Message{}, semconv.MessagingOperationReceive) - ctx2, receiveSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s receive", s.topicName), opts...) + ctx2, rs := s.tracer.Start(ctx2, fmt.Sprintf("%s receive", s.topicName), opts...) var maxToPull int32 // maximum number of messages to pull if po.synchronous { @@ -978,9 +979,11 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes return nil } if err != nil { + rs.RecordError(err) + rs.SetStatus(otelcodes.Error, err.Error()) return err } - receiveSpan.End() + rs.End() // If context is done and messages have been pulled, // nack them. select { @@ -993,10 +996,18 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes } for i, msg := range msgs { - msg := msg - fcSpanName := fmt.Sprintf("%s waiting for subscriber flow control", s.topicName) - ctx2, fcSpan := s.tracer.Start(ctx2, fcSpanName, opts...) - defer fcSpan.End() + opts := getSubSpanAttributes(s.topicName, msg, semconv.MessagingOperationProcess) + if msg.Attributes != nil && msg.Attributes["gogclient_traceparent"] != "" { + lctx := otel.GetTextMapPropagator().Extract(ctx2, NewPubsubMessageCarrier(msg)) + link := trace.LinkFromContext(lctx) + opts = append(opts, trace.WithLinks(link)) + } + _, ps := s.tracer.Start(ctx2, fmt.Sprintf("%s process", s.topicName), opts...) + ps.AddEvent("waiting for subscriber flow control") + var fcTimer time.Time + if ps.IsRecording() { + fcTimer = time.Now() + } // TODO(jba): call acquire closer to when the message is allocated. if err := fc.acquire(ctx, len(msg.Data)); err != nil { // TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done. @@ -1006,12 +1017,18 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // Return nil if the context is done, not err. return nil } - fcSpan.End() + ps.AddEvent("acquired subscriber flow control resources", trace.WithAttributes(attribute.Float64("elapsed_ms", float64(time.Since(fcTimer))/float64(time.Millisecond)))) ackh, _ := msgAckHandler(msg) old := ackh.doneFunc msgLen := len(msg.Data) ackh.doneFunc = func(ackID string, ack bool, receiveTime time.Time) { defer fc.release(ctx, msgLen) + defer ps.End() + if ack { + ps.SetAttributes(attribute.String("result", "ack")) + } else { + ps.SetAttributes(attribute.String("result", "nack")) + } old(ackID, ack, receiveTime) } wg.Add(1) @@ -1023,29 +1040,14 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // TODO(deklerk): Can we have a generic handler at the // constructor level? if err := sched.Add(key, msg, func(msg interface{}) { - m := msg.(*Message) - opts := getSubSpanAttributes("", m, semconv.MessagingOperationProcess) - if m.Attributes != nil && m.Attributes["gogclient_traceparent"] != "" { - lctx := otel.GetTextMapPropagator().Extract(ctx2, NewPubsubMessageCarrier(m)) - link := trace.LinkFromContext(lctx) - opts = append(opts, trace.WithLinks(link)) - } - _, processSpan := s.tracer.Start(ctx2, fmt.Sprintf("%s process", s.topicName), opts...) - // End spans to ack handler doneFunc, which also handles flow control release. - old := ackh.doneFunc - ackh.doneFunc = func(ackID string, ack bool, receiveTime time.Time) { - if ack { - processSpan.SetAttributes(attribute.String("result", "ack")) - } else { - processSpan.SetAttributes(attribute.String("result", "nack")) - } - defer processSpan.End() - old(ackID, ack, receiveTime) - } defer wg.Done() + rs.AddEvent("started handling provided callback") f(ctx2, msg.(*Message)) + rs.AddEvent("finished handling provided callback") }); err != nil { wg.Done() + ps.RecordError(err) + ps.SetStatus(otelcodes.Error, err.Error()) return err } } diff --git a/pubsub/topic.go b/pubsub/topic.go index cb8bcc3d37ea..627dffd3ef79 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -33,6 +33,7 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/tag" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.opentelemetry.io/otel/trace" "google.golang.org/api/support/bundler" @@ -533,9 +534,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { otelctx := ctx opts := getPublishSpanAttributes(t.String(), msg) ctx, span := t.tracer.Start(ctx, t.String()+" send", opts...) - // Discard this span's context since it is a child span. We only care about the main "send" span. - _, span2 := t.tracer.Start(ctx, t.String()+" waiting for publisher flow control", opts...) - defer span2.End() + span.AddEvent("waiting for publisher flow control") r := ipubsub.NewPublishResult() if !t.EnableMessageOrdering && msg.OrderingKey != "" { ipubsub.SetPublishResult(r, "", errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering")) @@ -576,22 +575,24 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { span.SetStatus(otelcodes.Error, errTopicStopped.Error()) return r } - _, span3 := t.tracer.Start(ctx, t.String()+" waiting in batch") + span.AddEvent("acquired publisher flow control resources") bmsg := &bundledMessage{ - ctx: ctx, - msg: msg, - res: r, - size: msgSize, - pubSpan: span, - batchSpan: span3, + ctx: ctx, + msg: msg, + res: r, + size: msgSize, + span: span, } + if err := t.scheduler.Add(msg.OrderingKey, bmsg, msgSize); err != nil { t.scheduler.Pause(msg.OrderingKey) ipubsub.SetPublishResult(r, "", err) span.RecordError(errTopicStopped) span.SetStatus(otelcodes.Error, errTopicStopped.Error()) } + span.AddEvent("added to batch") + return r } @@ -622,8 +623,8 @@ type bundledMessage struct { msg *Message res *PublishResult size int - // pubSpan is the entire publish span (from user calling Publish to the publish RPC resolving). - pubSpan trace.Span + // span is the entire publish span (from user calling Publish to the publish RPC resolving). + span trace.Span // batchSpan traces the message batching operation in publish scheduler. batchSpan trace.Span } @@ -664,7 +665,7 @@ func (t *Topic) initBundler(otelctx context.Context) { } bmsgs := bundle.([]*bundledMessage) for _, m := range bmsgs { - m.batchSpan.End() + m.span.AddEvent("removed from batch") } t.publishMessageBundle(otelctx, ctx2, bmsgs) }) @@ -707,28 +708,26 @@ func (t *Topic) publishMessageBundle(otelctx, ctx context.Context, bms []*bundle if err != nil { log.Printf("pubsub: cannot create context with tag in publishMessageBundle: %v", err) } - pbMsgs := make([]*pb.PubsubMessage, len(bms)) + numMsgs := len(bms) + pbMsgs := make([]*pb.PubsubMessage, numMsgs) var orderingKey string - - var ll []trace.Link + if numMsgs != 0 { + // extract the ordering key for this batch. since + // messages in the same batch share the same ordering + // key, it doesn't matter which we read from. + orderingKey = bms[0].msg.OrderingKey + } for i, bm := range bms { - orderingKey = bm.msg.OrderingKey pbMsgs[i] = &pb.PubsubMessage{ Data: bm.msg.Data, Attributes: bm.msg.Attributes, OrderingKey: bm.msg.OrderingKey, } - l := trace.LinkFromContext(bm.ctx) - ll = append(ll, l) - defer bm.pubSpan.End() + bm.span.AddEvent("starting publish RPC", trace.WithAttributes(attribute.Int("num_messages_in_batch", numMsgs))) + defer bm.span.End() bm.msg = nil // release bm.msg for GC } - opts := getPublishSpanAttributes(t.String(), &Message{}) - opts = append(opts, trace.WithLinks(ll...)) - _, span := t.tracer.Start(otelctx, t.String()+" publish RPC", opts...) - defer span.End() - var res *pb.PublishResponse start := time.Now() if orderingKey != "" && t.scheduler.IsPaused(orderingKey) { @@ -763,11 +762,12 @@ func (t *Topic) publishMessageBundle(otelctx, ctx context.Context, bms []*bundle t.flowController.release(ctx, bm.size) if err != nil { ipubsub.SetPublishResult(bm.res, "", err) - bm.pubSpan.RecordError(err) - bm.pubSpan.SetStatus(otelcodes.Error, err.Error()) + bm.span.RecordError(err) + bm.span.SetStatus(otelcodes.Error, err.Error()) } else { ipubsub.SetPublishResult(bm.res, res.MessageIds[i], nil) - bm.pubSpan.SetAttributes(semconv.MessagingMessageIDKey.String(res.MessageIds[i])) + bm.span.SetAttributes(semconv.MessagingMessageIDKey.String(res.MessageIds[i])) + bm.span.AddEvent("received publish rpc results") } } } diff --git a/pubsub/trace_test.go b/pubsub/trace_test.go index 2e6513640165..da004f585912 100644 --- a/pubsub/trace_test.go +++ b/pubsub/trace_test.go @@ -19,7 +19,6 @@ import ( "fmt" "testing" - "github.com/stretchr/testify/assert" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" @@ -46,7 +45,7 @@ func TestPublishSpan(t *testing.T) { for i, span := range spans { // Check span - assert.True(t, span.SpanContext().IsValid()) + // assert.True(t, span.SpanContext().IsValid()) // assert.Equal(t, "pubsub.topic", span.Name()) fmt.Printf("got span %d: %+v\n", i, span) } @@ -73,7 +72,7 @@ func TestSubscribeSpan(t *testing.T) { for i, span := range spans { // Check span - assert.True(t, span.SpanContext().IsValid()) + // assert.True(t, span.SpanContext().IsValid()) // assert.Equal(t, "pubsub.topic", span.Name()) fmt.Printf("got span %d: %+v\n", i, span) }