diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index a30967b4..3b5d1341 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -242,6 +242,7 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) { drain(requestor) drain(responder) wasCancelled := assertCancelOrComplete(ctx, t) + tracing := collectTracing(t) traceStrings := tracing.TracesToStrings() require.Contains(t, traceStrings, "response(0)->executeTask(0)") @@ -277,6 +278,7 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) { // initialize graphsync on second node to response to requests responder := td.GraphSyncHost2(MaxLinksPerIncomingRequests(linksToTraverse)) + assertComplete := assertCompletionFunction(responder, 1) progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension) @@ -287,6 +289,7 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) { drain(requestor) drain(responder) + assertComplete(ctx, t) tracing := collectTracing(t) require.ElementsMatch(t, []string{ @@ -318,6 +321,7 @@ func TestGraphsyncRoundTrip(t *testing.T) { // initialize graphsync on second node to response to requests responder := td.GraphSyncHost2() + assertComplete := assertCompletionFunction(responder, 1) var receivedResponseData []byte var receivedRequestData []byte @@ -364,6 +368,7 @@ func TestGraphsyncRoundTrip(t *testing.T) { drain(requestor) drain(responder) + assertComplete(ctx, t) tracing := collectTracing(t) require.ElementsMatch(t, []string{ @@ -400,6 +405,7 @@ func TestGraphsyncRoundTripPartial(t *testing.T) { // initialize graphsync on second node to response to requests responder := td.GraphSyncHost2() + assertComplete := assertCompletionFunction(responder, 1) finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1) responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) { @@ -431,6 +437,7 @@ func TestGraphsyncRoundTripPartial(t *testing.T) { drain(requestor) drain(responder) + assertComplete(ctx, t) tracing := collectTracing(t) require.ElementsMatch(t, []string{ @@ -472,6 +479,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { // initialize graphsync on second node to response to requests responder := td.GraphSyncHost2() + assertComplete := assertCompletionFunction(responder, 1) totalSent := 0 totalSentOnWire := 0 @@ -493,6 +501,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { drain(requestor) drain(responder) + assertComplete(ctx, t) tracing := collectTracing(t) require.ElementsMatch(t, []string{ @@ -558,6 +567,7 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { drain(requestor) drain(responder) assertComplete(ctx, t) + tracing := collectTracing(t) require.ElementsMatch(t, []string{ "response(0)->executeTask(0)", @@ -641,6 +651,7 @@ func TestPauseResume(t *testing.T) { drain(requestor) drain(responder) assertOneRequestCompletes(ctx, t) + tracing := collectTracing(t) traceStrings := tracing.TracesToStrings() require.Contains(t, traceStrings, "response(0)->executeTask(0)") @@ -713,6 +724,7 @@ func TestPauseResumeRequest(t *testing.T) { // should get max 1 cancel require.False(t, assertCancelOrComplete(ctx, t)) } + tracing := collectTracing(t) traceStrings := tracing.TracesToStrings() require.Contains(t, traceStrings, "response(0)->executeTask(0)") @@ -1049,7 +1061,7 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { // initialize graphsync on second node to response to requests responder := td.GraphSyncHost2() - assertComplete := assertCompletionFunction(responder, 2) + assertCancelOrComplete := assertCancelOrCompleteFunction(responder, 1) // alternate storing location for responder altStore1 := make(map[ipld.Link][]byte) @@ -1104,21 +1116,21 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { drain(requestor) drain(responder) - assertComplete(ctx, t) - assertComplete(ctx, t) + wasCancelled := assertCancelOrComplete(ctx, t) tracing := collectTracing(t) - // two complete request traces expected - require.ElementsMatch(t, []string{ - "response(0)->executeTask(0)", - "response(1)->executeTask(0)", - "request(0)->newRequest(0)", - "request(0)->executeTask(0)", - "request(0)->terminateRequest(0)", - "request(1)->newRequest(0)", - "request(1)->executeTask(0)", - "request(1)->terminateRequest(0)", - }, tracing.TracesToStrings()) + traceStrings := tracing.TracesToStrings() + require.Contains(t, traceStrings, "response(0)->executeTask(0)") + // may or may not contain a second response trace: "response(1)->executeTask(0)"" + if wasCancelled { + require.Contains(t, traceStrings, "response(0)->abortRequest(0)") + } + require.Contains(t, traceStrings, "request(0)->newRequest(0)") + require.Contains(t, traceStrings, "request(0)->executeTask(0)") + require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") + require.Contains(t, traceStrings, "request(1)->newRequest(0)") + require.Contains(t, traceStrings, "request(1)->executeTask(0)") + require.Contains(t, traceStrings, "request(1)->terminateRequest(0)") // TODO(rvagg): this is randomly either a SkipMe or a ipldutil.ContextCancelError; confirm this is sane // tracing.SingleExceptionEvent(t, "request(0)->newRequest(0)","request(0)->executeTask(0)", "SkipMe", traversal.SkipMe{}.Error(), true) } @@ -1192,20 +1204,18 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) { drain(requestor) drain(responder) assertComplete(ctx, t) - assertComplete(ctx, t) tracing := collectTracing(t) // two complete request traces expected - require.ElementsMatch(t, []string{ - "response(0)->executeTask(0)", - "response(1)->executeTask(0)", - "request(0)->newRequest(0)", - "request(0)->executeTask(0)", - "request(0)->terminateRequest(0)", - "request(1)->newRequest(0)", - "request(1)->executeTask(0)", - "request(1)->terminateRequest(0)", - }, tracing.TracesToStrings()) + traceStrings := tracing.TracesToStrings() + require.Contains(t, traceStrings, "response(0)->executeTask(0)") + // may or may not contain a second response "response(1)->executeTask(0)" + require.Contains(t, traceStrings, "request(0)->newRequest(0)") + require.Contains(t, traceStrings, "request(0)->executeTask(0)") + require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") + require.Contains(t, traceStrings, "request(1)->newRequest(0)") + require.Contains(t, traceStrings, "request(1)->executeTask(0)") + require.Contains(t, traceStrings, "request(1)->terminateRequest(0)") } // TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work @@ -1330,6 +1340,7 @@ func TestUnixFSFetch(t *testing.T) { td := newGsTestData(ctx, t) requestor := New(ctx, td.gsnet1, persistence1) responder := New(ctx, td.gsnet2, persistence2) + assertComplete := assertCompletionFunction(responder, 1) extensionName := graphsync.ExtensionName("Free for all") responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -1381,6 +1392,7 @@ func TestUnixFSFetch(t *testing.T) { drain(requestor) drain(responder) + assertComplete(ctx, t) tracing := collectTracing(t) require.ElementsMatch(t, []string{ @@ -1409,6 +1421,7 @@ func TestGraphsyncBlockListeners(t *testing.T) { // initialize graphsync on second node to response to requests responder := td.GraphSyncHost2() + assertComplete := assertCompletionFunction(responder, 1) // register hooks to count blocks in various stages blocksSent := 0 @@ -1474,6 +1487,7 @@ func TestGraphsyncBlockListeners(t *testing.T) { drain(requestor) drain(responder) + assertComplete(ctx, t) tracing := collectTracing(t) require.ElementsMatch(t, []string{ @@ -1505,7 +1519,6 @@ type gsTestData struct { func drain(gs graphsync.GraphExchange) { gs.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() gs.(*GraphSync).responseQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() - gs.(*GraphSync).responseManager.Synchronize() } func assertCompletionFunction(gs graphsync.GraphExchange, completedRequestCount int) func(context.Context, *testing.T) { diff --git a/responsemanager/client.go b/responsemanager/client.go index 12cc072e..c5c2f863 100644 --- a/responsemanager/client.go +++ b/responsemanager/client.go @@ -200,7 +200,7 @@ func (rm *ResponseManager) CancelResponse(p peer.ID, requestID graphsync.Request } // Synchronize is a utility method that blocks until all current messages are processed -func (rm *ResponseManager) Synchronize() { +func (rm *ResponseManager) synchronize() { sync := make(chan error) rm.send(&synchronizeMessage{sync}, nil) select { diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index 21561e2d..47f843f8 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -100,7 +100,7 @@ func TestCancellationQueryInProgress(t *testing.T) { gsmsg.CancelRequest(td.requestID), } responseManager.ProcessRequests(td.ctx, td.p, cancelRequests) - responseManager.Synchronize() + responseManager.synchronize() close(waitForCancel) testutil.AssertDoesReceive(td.ctx, t, cancelledListenerCalled, "should call cancelled listener") @@ -149,7 +149,7 @@ func TestEarlyCancellation(t *testing.T) { td.requestHooks.Register(selectorvalidator.SelectorValidator(100)) responseManager.Startup() responseManager.ProcessRequests(td.ctx, td.p, td.requests) - responseManager.Synchronize() + responseManager.synchronize() td.connManager.AssertProtectedWithTags(t, td.p, td.requests[0].ID().Tag()) // send a cancellation @@ -158,7 +158,7 @@ func TestEarlyCancellation(t *testing.T) { } responseManager.ProcessRequests(td.ctx, td.p, cancelRequests) - responseManager.Synchronize() + responseManager.synchronize() td.assertNoResponses() td.connManager.RefuteProtected(t, td.p) @@ -634,7 +634,7 @@ func TestValidationAndExtensions(t *testing.T) { responseManager.ProcessRequests(td.ctx, td.p, td.requests) testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks") responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests) - responseManager.Synchronize() + responseManager.synchronize() close(wait) td.assertCompleteRequestWith(graphsync.RequestCompletedFull) td.assertReceiveExtensionResponse() @@ -704,7 +704,7 @@ func TestValidationAndExtensions(t *testing.T) { responseManager.ProcessRequests(td.ctx, td.p, td.requests) testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks") responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests) - responseManager.Synchronize() + responseManager.synchronize() close(wait) td.assertCompleteRequestWith(graphsync.RequestFailedUnknown) })