Skip to content

Commit

Permalink
feat: add basic tracing for responses
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Dec 9, 2021
1 parent 06cb155 commit 6a28a13
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 80 deletions.
9 changes: 7 additions & 2 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,12 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
}

// Request initiates a new GraphSync request to the given peer using the given selector spec.
func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
func (gs *GraphSync) Request(
ctx context.Context,
p peer.ID, root ipld.Link,
selector ipld.Node,
extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {

var extNames []string
for _, ext := range extensions {
extNames = append(extNames, string(ext.Name))
Expand Down Expand Up @@ -477,7 +482,7 @@ func (gsr *graphSyncReceiver) graphSync() *GraphSync {
return (*GraphSync)(gsr)
}

// ReceiveMessage is part of the networks Receiver interface and receives
// ReceiveMessage is part of the network's Receiver interface and receives
// incoming messages from the network
func (gsr *graphSyncReceiver) ReceiveMessage(
ctx context.Context,
Expand Down
182 changes: 129 additions & 53 deletions impl/graphsync_test.go

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,15 @@ func (gsr GraphSyncRequest) Extension(name graphsync.ExtensionName) ([]byte, boo
return val, true
}

// ExtensionNames returns the names of the extensions included in this request
func (gsr GraphSyncRequest) ExtensionNames() []string {
var extNames []string
for ext := range gsr.extensions {
extNames = append(extNames, ext)
}
return extNames
}

// IsCancel returns true if this particular request is being cancelled
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }

Expand All @@ -398,7 +407,15 @@ func (gsr GraphSyncResponse) Extension(name graphsync.ExtensionName) ([]byte, bo
return nil, false
}
return val, true
}

// ExtensionNames returns the names of the extensions included in this request
func (gsr GraphSyncResponse) ExtensionNames() []string {
var extNames []string
for ext := range gsr.extensions {
extNames = append(extNames, ext)
}
return extNames
}

// ReplaceExtensions merges the extensions given extensions into the request to create a new request,
Expand Down
18 changes: 10 additions & 8 deletions requestmanager/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,16 @@ func (e *Executor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask.

log.Debugw("beginning request execution", "id", requestTask.Request.ID(), "peer", pid.String(), "root_cid", requestTask.Request.Root().String())
err := e.traverse(requestTask)
span.RecordError(err)
if err != nil && !ipldutil.IsContextCancelErr(err) {
e.manager.SendRequest(requestTask.P, gsmsg.CancelRequest(requestTask.Request.ID()))
if !isPausedErr(err) {
span.SetStatus(codes.Error, err.Error())
select {
case <-requestTask.Ctx.Done():
case requestTask.InProgressErr <- err:
if err != nil {
span.RecordError(err)
if !ipldutil.IsContextCancelErr(err) {
e.manager.SendRequest(requestTask.P, gsmsg.CancelRequest(requestTask.Request.ID()))
if !isPausedErr(err) {
span.SetStatus(codes.Error, err.Error())
select {
case <-requestTask.Ctx.Done():
case requestTask.InProgressErr <- err:
}
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-peertaskqueue/peertask"
ipld "github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
Expand All @@ -30,6 +31,7 @@ var log = logging.Logger("graphsync")

type inProgressResponseStatus struct {
ctx context.Context
span trace.Span
cancelFn func()
request gsmsg.GraphSyncRequest
loader ipld.BlockReadOpener
Expand Down Expand Up @@ -197,8 +199,8 @@ func (rm *ResponseManager) CancelResponse(p peer.ID, requestID graphsync.Request
}
}

// this is a test utility method to force all messages to get processed
func (rm *ResponseManager) synchronize() {
// Synchronize is a utility method that blocks until all current messages are processed
func (rm *ResponseManager) Synchronize() {
sync := make(chan error)
rm.send(&synchronizeMessage{sync}, nil)
select {
Expand Down
8 changes: 4 additions & 4 deletions responsemanager/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ type processRequestMessage struct {
requests []gsmsg.GraphSyncRequest
}

func (prm *processRequestMessage) handle(rm *ResponseManager) {
rm.processRequests(prm.p, prm.requests)
}

type pauseRequestMessage struct {
p peer.ID
requestID graphsync.RequestID
Expand Down Expand Up @@ -111,10 +115,6 @@ func (str *startTaskRequest) handle(rm *ResponseManager) {
}
}

func (prm *processRequestMessage) handle(rm *ResponseManager) {
rm.processRequests(prm.p, prm.requests)
}

type peerStateMessage struct {
p peer.ID
peerStatsChan chan<- peerstate.PeerState
Expand Down
13 changes: 13 additions & 0 deletions responsemanager/queryexecutor/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
Expand Down Expand Up @@ -38,6 +41,7 @@ type ResponseTask struct {
Empty bool
Subscriber *notifications.TopicDataSubscriber
Ctx context.Context
Span trace.Span
Request gsmsg.GraphSyncRequest
Loader ipld.BlockReadOpener
Traverser ipldutil.Traverser
Expand Down Expand Up @@ -97,8 +101,17 @@ func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *pee
return false
}

_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(qe.ctx, rt.Span), "executeTask")
defer span.End()

log.Debugw("beginning response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String())
err := qe.executeQuery(pid, rt)
if err != nil {
span.RecordError(err)
if _, isPaused := err.(hooks.ErrPaused); !isPaused {
span.SetStatus(codes.Error, err.Error())
}
}
qe.manager.FinishTask(task, err)
log.Debugw("finishing response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String())
return false
Expand Down
10 changes: 5 additions & 5 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestCancellationQueryInProgress(t *testing.T) {
gsmsg.CancelRequest(td.requestID),
}
responseManager.ProcessRequests(td.ctx, td.p, cancelRequests)
responseManager.synchronize()
responseManager.Synchronize()
close(waitForCancel)

testutil.AssertDoesReceive(td.ctx, t, cancelledListenerCalled, "should call cancelled listener")
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestEarlyCancellation(t *testing.T) {
td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
responseManager.Startup()
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
responseManager.synchronize()
responseManager.Synchronize()
td.connManager.AssertProtectedWithTags(t, td.p, td.requests[0].ID().Tag())

// send a cancellation
Expand All @@ -158,7 +158,7 @@ func TestEarlyCancellation(t *testing.T) {
}
responseManager.ProcessRequests(td.ctx, td.p, cancelRequests)

responseManager.synchronize()
responseManager.Synchronize()

td.assertNoResponses()
td.connManager.RefuteProtected(t, td.p)
Expand Down Expand Up @@ -634,7 +634,7 @@ func TestValidationAndExtensions(t *testing.T) {
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks")
responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
responseManager.synchronize()
responseManager.Synchronize()
close(wait)
td.assertCompleteRequestWith(graphsync.RequestCompletedFull)
td.assertReceiveExtensionResponse()
Expand Down Expand Up @@ -704,7 +704,7 @@ func TestValidationAndExtensions(t *testing.T) {
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks")
responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
responseManager.synchronize()
responseManager.Synchronize()
close(wait)
td.assertCompleteRequestWith(graphsync.RequestFailedUnknown)
})
Expand Down
43 changes: 42 additions & 1 deletion responsemanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/peertracker"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
Expand Down Expand Up @@ -50,6 +54,7 @@ func (rm *ResponseManager) terminateRequest(key responseKey) {
rm.connManager.Unprotect(key.p, key.requestID.Tag())
delete(rm.inProgressResponses, key)
ipr.cancelFn()
ipr.span.End()
}

func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSyncRequest) {
Expand All @@ -58,6 +63,17 @@ func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSync
log.Warnf("received update for non existent request, peer %s, request ID %d", key.p.Pretty(), key.requestID)
return
}

_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, response.span), "processUpdate", trace.WithAttributes(
attribute.Int("id", int(update.ID())),
attribute.Int("priority", int(update.Priority())),
attribute.String("root", update.Root().String()),
attribute.Bool("isCancel", update.IsCancel()),
attribute.Bool("isUpdate", update.IsUpdate()),
attribute.StringSlice("extensions", update.ExtensionNames()),
))
defer span.End()

if response.state != graphsync.Paused {
response.updates = append(response.updates, update)
select {
Expand All @@ -79,11 +95,15 @@ func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSync
})
if result.Err != nil {
response.state = graphsync.CompletingSend
response.span.RecordError(result.Err)
response.span.SetStatus(codes.Error, result.Err.Error())
return
}
if result.Unpause {
err := rm.unpauseRequest(key.p, key.requestID)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, result.Err.Error())
log.Warnf("error unpausing request: %s", err.Error())
}
}
Expand Down Expand Up @@ -119,6 +139,13 @@ func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID
return errors.New("could not find request")
}

_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, response.span), "abortRequest")
defer span.End()
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
response.span.RecordError(err)
response.span.SetStatus(codes.Error, err.Error())

if response.state != graphsync.Running {
_ = rm.responseAssembler.Transaction(p, requestID, func(rb responseassembler.ResponseBuilder) error {
if ipldutil.IsContextCancelErr(err) {
Expand Down Expand Up @@ -155,9 +182,17 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync
rm.processUpdate(key, request)
continue
}
ctx, responseSpan := otel.Tracer("graphsync").Start(rm.ctx, "response", trace.WithAttributes(
attribute.Int("id", int(request.ID())),
attribute.Int("priority", int(request.Priority())),
attribute.String("root", request.Root().String()),
attribute.Bool("isCancel", request.IsCancel()),
attribute.Bool("isUpdate", request.IsUpdate()),
attribute.StringSlice("extensions", request.ExtensionNames()),
))
rm.connManager.Protect(p, request.ID().Tag())
rm.requestQueuedHooks.ProcessRequestQueuedHooks(p, request)
ctx, cancelFn := context.WithCancel(rm.ctx)
ctx, cancelFn := context.WithCancel(ctx)
sub := notifications.NewTopicDataSubscriber(&subscriber{
p: key.p,
request: request,
Expand All @@ -176,6 +211,7 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync
rm.inProgressResponses[key] =
&inProgressResponseStatus{
ctx: ctx,
span: responseSpan,
cancelFn: cancelFn,
subscriber: sub,
request: request,
Expand Down Expand Up @@ -204,6 +240,8 @@ func (rm *ResponseManager) taskDataForKey(key responseKey) queryexecutor.Respons
loader, traverser, isPaused, err := (&queryPreparer{rm.requestHooks, rm.responseAssembler, rm.linkSystem, rm.maxLinksPerRequest}).prepareQuery(response.ctx, key.p, response.request, response.signals, response.subscriber)
if err != nil {
response.state = graphsync.CompletingSend
response.span.RecordError(err)
response.span.SetStatus(codes.Error, err.Error())
return queryexecutor.ResponseTask{Empty: true}
}
response.loader = loader
Expand All @@ -216,6 +254,7 @@ func (rm *ResponseManager) taskDataForKey(key responseKey) queryexecutor.Respons
response.state = graphsync.Running
return queryexecutor.ResponseTask{
Ctx: response.ctx,
Span: response.span,
Empty: false,
Subscriber: response.subscriber,
Request: response.request,
Expand Down Expand Up @@ -249,6 +288,8 @@ func (rm *ResponseManager) finishTask(task *peertask.Task, err error) {
log.Infow("graphsync response processing complete (messages stil sending)", "request id", key.requestID, "peer", key.p, "total time", time.Since(response.startTime))

if err != nil {
response.span.RecordError(err)
response.span.SetStatus(codes.Error, err.Error())
log.Infof("response failed: %w", err)
}

Expand Down
10 changes: 5 additions & 5 deletions testutil/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ func (c Collector) tracesToString(trace string, spans tracetest.SpanStubs, match
// identified by its trace string as described in TracesToStrings. Note that
// this string can also be a partial of a complete trace, e.g. just `"foo(0)"`
// without any children to fetch the parent span.
func (c Collector) FindSpanByTraceString(trace string) tracetest.SpanStub {
var found tracetest.SpanStub
func (c Collector) FindSpanByTraceString(trace string) *tracetest.SpanStub {
var found *tracetest.SpanStub
c.tracesToString("", c.FindParentSpans(), trace, func(span tracetest.SpanStub) {
if found.Name != "" {
if found != nil && found.Name != "" {
panic("found more than one span with the same trace string")
}
found = span
found = &span
})
return found
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func (c Collector) SingleExceptionEvent(t *testing.T, trace string, typeRe strin
// has ContextCancelError exception recorded in the right place
et := c.FindSpanByTraceString(trace)
require.Len(t, et.Events, 1, "expected one event in span %v", trace)
ex := EventAsException(t, EventInTraceSpan(t, et, "exception"))
ex := EventAsException(t, EventInTraceSpan(t, *et, "exception"))
require.Regexp(t, typeRe, ex.Type)
require.Regexp(t, messageRe, ex.Message)
if errorCode {
Expand Down

0 comments on commit 6a28a13

Please sign in to comment.