Skip to content

Commit

Permalink
fix: improve test synchronization/drainage
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Dec 10, 2021
1 parent 8f8bc47 commit 218092a
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 32 deletions.
65 changes: 39 additions & 26 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) {
drain(requestor)
drain(responder)
wasCancelled := assertCancelOrComplete(ctx, t)

tracing := collectTracing(t)
traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "response(0)->executeTask(0)")
Expand Down Expand Up @@ -277,6 +278,7 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2(MaxLinksPerIncomingRequests(linksToTraverse))
assertComplete := assertCompletionFunction(responder, 1)

progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

Expand All @@ -287,6 +289,7 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {

drain(requestor)
drain(responder)
assertComplete(ctx, t)

tracing := collectTracing(t)
require.ElementsMatch(t, []string{
Expand Down Expand Up @@ -318,6 +321,7 @@ func TestGraphsyncRoundTrip(t *testing.T) {

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
assertComplete := assertCompletionFunction(responder, 1)

var receivedResponseData []byte
var receivedRequestData []byte
Expand Down Expand Up @@ -364,6 +368,7 @@ func TestGraphsyncRoundTrip(t *testing.T) {

drain(requestor)
drain(responder)
assertComplete(ctx, t)

tracing := collectTracing(t)
require.ElementsMatch(t, []string{
Expand Down Expand Up @@ -400,6 +405,7 @@ func TestGraphsyncRoundTripPartial(t *testing.T) {

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
assertComplete := assertCompletionFunction(responder, 1)

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
Expand Down Expand Up @@ -431,6 +437,7 @@ func TestGraphsyncRoundTripPartial(t *testing.T) {

drain(requestor)
drain(responder)
assertComplete(ctx, t)

tracing := collectTracing(t)
require.ElementsMatch(t, []string{
Expand Down Expand Up @@ -472,6 +479,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
assertComplete := assertCompletionFunction(responder, 1)

totalSent := 0
totalSentOnWire := 0
Expand All @@ -493,6 +501,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {

drain(requestor)
drain(responder)
assertComplete(ctx, t)

tracing := collectTracing(t)
require.ElementsMatch(t, []string{
Expand Down Expand Up @@ -558,6 +567,7 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) {
drain(requestor)
drain(responder)
assertComplete(ctx, t)

tracing := collectTracing(t)
require.ElementsMatch(t, []string{
"response(0)->executeTask(0)",
Expand Down Expand Up @@ -641,6 +651,7 @@ func TestPauseResume(t *testing.T) {
drain(requestor)
drain(responder)
assertOneRequestCompletes(ctx, t)

tracing := collectTracing(t)
traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "response(0)->executeTask(0)")
Expand Down Expand Up @@ -713,6 +724,7 @@ func TestPauseResumeRequest(t *testing.T) {
// should get max 1 cancel
require.False(t, assertCancelOrComplete(ctx, t))
}

tracing := collectTracing(t)
traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "response(0)->executeTask(0)")
Expand Down Expand Up @@ -1049,7 +1061,7 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
assertComplete := assertCompletionFunction(responder, 2)
assertCancelOrComplete := assertCancelOrCompleteFunction(responder, 1)

// alternate storing location for responder
altStore1 := make(map[ipld.Link][]byte)
Expand Down Expand Up @@ -1104,21 +1116,21 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {

drain(requestor)
drain(responder)
assertComplete(ctx, t)
assertComplete(ctx, t)
wasCancelled := assertCancelOrComplete(ctx, t)

tracing := collectTracing(t)
// two complete request traces expected
require.ElementsMatch(t, []string{
"response(0)->executeTask(0)",
"response(1)->executeTask(0)",
"request(0)->newRequest(0)",
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
"request(1)->newRequest(0)",
"request(1)->executeTask(0)",
"request(1)->terminateRequest(0)",
}, tracing.TracesToStrings())
traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "response(0)->executeTask(0)")
// may or may not contain a second response trace: "response(1)->executeTask(0)""
if wasCancelled {
require.Contains(t, traceStrings, "response(0)->abortRequest(0)")
}
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "request(1)->newRequest(0)")
require.Contains(t, traceStrings, "request(1)->executeTask(0)")
require.Contains(t, traceStrings, "request(1)->terminateRequest(0)")
// TODO(rvagg): this is randomly either a SkipMe or a ipldutil.ContextCancelError; confirm this is sane
// tracing.SingleExceptionEvent(t, "request(0)->newRequest(0)","request(0)->executeTask(0)", "SkipMe", traversal.SkipMe{}.Error(), true)
}
Expand Down Expand Up @@ -1192,20 +1204,18 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) {
drain(requestor)
drain(responder)
assertComplete(ctx, t)
assertComplete(ctx, t)

tracing := collectTracing(t)
// two complete request traces expected
require.ElementsMatch(t, []string{
"response(0)->executeTask(0)",
"response(1)->executeTask(0)",
"request(0)->newRequest(0)",
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
"request(1)->newRequest(0)",
"request(1)->executeTask(0)",
"request(1)->terminateRequest(0)",
}, tracing.TracesToStrings())
traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "response(0)->executeTask(0)")
// may or may not contain a second response "response(1)->executeTask(0)"
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "request(1)->newRequest(0)")
require.Contains(t, traceStrings, "request(1)->executeTask(0)")
require.Contains(t, traceStrings, "request(1)->terminateRequest(0)")
}

// TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work
Expand Down Expand Up @@ -1330,6 +1340,7 @@ func TestUnixFSFetch(t *testing.T) {
td := newGsTestData(ctx, t)
requestor := New(ctx, td.gsnet1, persistence1)
responder := New(ctx, td.gsnet2, persistence2)
assertComplete := assertCompletionFunction(responder, 1)
extensionName := graphsync.ExtensionName("Free for all")
responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
Expand Down Expand Up @@ -1381,6 +1392,7 @@ func TestUnixFSFetch(t *testing.T) {

drain(requestor)
drain(responder)
assertComplete(ctx, t)

tracing := collectTracing(t)
require.ElementsMatch(t, []string{
Expand Down Expand Up @@ -1409,6 +1421,7 @@ func TestGraphsyncBlockListeners(t *testing.T) {

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
assertComplete := assertCompletionFunction(responder, 1)

// register hooks to count blocks in various stages
blocksSent := 0
Expand Down Expand Up @@ -1474,6 +1487,7 @@ func TestGraphsyncBlockListeners(t *testing.T) {

drain(requestor)
drain(responder)
assertComplete(ctx, t)

tracing := collectTracing(t)
require.ElementsMatch(t, []string{
Expand Down Expand Up @@ -1505,7 +1519,6 @@ type gsTestData struct {
func drain(gs graphsync.GraphExchange) {
gs.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()
gs.(*GraphSync).responseQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()
gs.(*GraphSync).responseManager.Synchronize()
}

func assertCompletionFunction(gs graphsync.GraphExchange, completedRequestCount int) func(context.Context, *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (rm *ResponseManager) CancelResponse(p peer.ID, requestID graphsync.Request
}

// Synchronize is a utility method that blocks until all current messages are processed
func (rm *ResponseManager) Synchronize() {
func (rm *ResponseManager) synchronize() {
sync := make(chan error)
rm.send(&synchronizeMessage{sync}, nil)
select {
Expand Down
10 changes: 5 additions & 5 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestCancellationQueryInProgress(t *testing.T) {
gsmsg.CancelRequest(td.requestID),
}
responseManager.ProcessRequests(td.ctx, td.p, cancelRequests)
responseManager.Synchronize()
responseManager.synchronize()
close(waitForCancel)

testutil.AssertDoesReceive(td.ctx, t, cancelledListenerCalled, "should call cancelled listener")
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestEarlyCancellation(t *testing.T) {
td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
responseManager.Startup()
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
responseManager.Synchronize()
responseManager.synchronize()
td.connManager.AssertProtectedWithTags(t, td.p, td.requests[0].ID().Tag())

// send a cancellation
Expand All @@ -158,7 +158,7 @@ func TestEarlyCancellation(t *testing.T) {
}
responseManager.ProcessRequests(td.ctx, td.p, cancelRequests)

responseManager.Synchronize()
responseManager.synchronize()

td.assertNoResponses()
td.connManager.RefuteProtected(t, td.p)
Expand Down Expand Up @@ -634,7 +634,7 @@ func TestValidationAndExtensions(t *testing.T) {
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks")
responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
responseManager.Synchronize()
responseManager.synchronize()
close(wait)
td.assertCompleteRequestWith(graphsync.RequestCompletedFull)
td.assertReceiveExtensionResponse()
Expand Down Expand Up @@ -704,7 +704,7 @@ func TestValidationAndExtensions(t *testing.T) {
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks")
responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
responseManager.Synchronize()
responseManager.synchronize()
close(wait)
td.assertCompleteRequestWith(graphsync.RequestFailedUnknown)
})
Expand Down

0 comments on commit 218092a

Please sign in to comment.