diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index f4fa0858f0ac..67aefdf80cf4 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -45,6 +45,11 @@ type Client struct { rawClient *storage.BigQueryWriteClient projectID string + // retained context. primarily used for connection management and the underlying + // client. + ctx context.Context + cancel context.CancelFunc + // cfg retains general settings (custom ClientOptions). cfg *writerClientConfig @@ -66,8 +71,11 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio } o = append(o, opts...) - rawClient, err := storage.NewBigQueryWriteClient(ctx, o...) + cCtx, cancel := context.WithCancel(ctx) + + rawClient, err := storage.NewBigQueryWriteClient(cCtx, o...) if err != nil { + cancel() return nil, err } rawClient.SetGoogleClientInfo("gccl", internal.Version) @@ -75,12 +83,15 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio // Handle project autodetection. projectID, err = detect.ProjectID(ctx, projectID, "", opts...) if err != nil { + cancel() return nil, err } return &Client{ rawClient: rawClient, projectID: projectID, + ctx: cCtx, + cancel: cancel, cfg: newWriterClientConfig(opts...), pools: make(map[string]*connectionPool), }, nil @@ -103,6 +114,10 @@ func (c *Client) Close() error { if err := c.rawClient.Close(); err != nil && firstErr == nil { firstErr = err } + // Cancel the retained client context. + if c.cancel != nil { + c.cancel() + } return firstErr } @@ -114,8 +129,11 @@ func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*M } // createOpenF builds the opener function we need to access the AppendRows bidi stream. -func createOpenF(ctx context.Context, streamFunc streamClientFunc) func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { - return func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { +func createOpenF(streamFunc streamClientFunc, routingHeader string) func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { + return func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { + if routingHeader != "" { + ctx = metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", routingHeader) + } arc, err := streamFunc(ctx, opts...) if err != nil { return nil, err @@ -167,11 +185,11 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient if err != nil { return nil, err } - // Add the writer to the pool, and derive context from the pool. + // Add the writer to the pool. if err := pool.addWriter(writer); err != nil { return nil, err } - writer.ctx, writer.cancel = context.WithCancel(pool.ctx) + writer.ctx, writer.cancel = context.WithCancel(ctx) // Attach any tag keys to the context on the writer, so instrumentation works as expected. writer.ctx = setupWriterStatContext(writer) @@ -218,7 +236,7 @@ func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, stre } // No existing pool available, create one for the location and add to shared pools. - pool, err := c.createPool(ctx, loc, streamFunc) + pool, err := c.createPool(loc, streamFunc) if err != nil { return nil, err } @@ -227,24 +245,28 @@ func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, stre } // createPool builds a connectionPool. -func (c *Client) createPool(ctx context.Context, location string, streamFunc streamClientFunc) (*connectionPool, error) { - cCtx, cancel := context.WithCancel(ctx) +func (c *Client) createPool(location string, streamFunc streamClientFunc) (*connectionPool, error) { + cCtx, cancel := context.WithCancel(c.ctx) if c.cfg == nil { cancel() return nil, fmt.Errorf("missing client config") } - if location != "" { - // add location header to the retained pool context. - cCtx = metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_location=%s", location)) - } + + var routingHeader string + /* + * TODO: set once backend respects the new routing header + * if location != "" && c.projectID != "" { + * routingHeader = fmt.Sprintf("write_location=projects/%s/locations/%s", c.projectID, location) + * } + */ pool := &connectionPool{ id: newUUID(poolIDPrefix), location: location, ctx: cCtx, cancel: cancel, - open: createOpenF(ctx, streamFunc), + open: createOpenF(streamFunc, routingHeader), callOptions: c.cfg.defaultAppendRowsCallOptions, baseFlowController: newFlowController(c.cfg.defaultInflightRequests, c.cfg.defaultInflightBytes), } diff --git a/bigquery/storage/managedwriter/client_test.go b/bigquery/storage/managedwriter/client_test.go index 242a8b70a680..2163ac4919f7 100644 --- a/bigquery/storage/managedwriter/client_test.go +++ b/bigquery/storage/managedwriter/client_test.go @@ -55,10 +55,13 @@ func TestTableParentFromStreamName(t *testing.T) { } func TestCreatePool_Location(t *testing.T) { + t.Skip("skipping until new write_location is allowed") c := &Client{ - cfg: &writerClientConfig{}, + cfg: &writerClientConfig{}, + ctx: context.Background(), + projectID: "myproj", } - pool, err := c.createPool(context.Background(), "foo", nil) + pool, err := c.createPool("foo", nil) if err != nil { t.Fatalf("createPool: %v", err) } @@ -72,7 +75,7 @@ func TestCreatePool_Location(t *testing.T) { } found := false for _, v := range vals { - if v == "write_location=foo" { + if v == "write_location=projects/myproj/locations/foo" { found = true break } @@ -151,8 +154,9 @@ func TestCreatePool(t *testing.T) { for _, tc := range testCases { c := &Client{ cfg: tc.cfg, + ctx: context.Background(), } - pool, err := c.createPool(context.Background(), "", nil) + pool, err := c.createPool("", nil) if err != nil { t.Errorf("case %q: createPool errored unexpectedly: %v", tc.desc, err) continue diff --git a/bigquery/storage/managedwriter/connection.go b/bigquery/storage/managedwriter/connection.go index 3ea97a5825df..316371747dee 100644 --- a/bigquery/storage/managedwriter/connection.go +++ b/bigquery/storage/managedwriter/connection.go @@ -54,7 +54,7 @@ type connectionPool struct { // We centralize the open function on the pool, rather than having an instance of the open func on every // connection. Opening the connection is a stateless operation. - open func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) + open func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) // We specify default calloptions for the pool. // Explicit connections may have their own calloptions as well. @@ -137,7 +137,7 @@ func (cp *connectionPool) openWithRetry(co *connection) (storagepb.BigQueryWrite r := &unaryRetryer{} for { recordStat(cp.ctx, AppendClientOpenCount, 1) - arc, err := cp.open(cp.mergeCallOptions(co)...) + arc, err := cp.open(co.ctx, cp.mergeCallOptions(co)...) if err != nil { bo, shouldRetry := r.Retry(err) if shouldRetry { @@ -151,6 +151,7 @@ func (cp *connectionPool) openWithRetry(co *connection) (storagepb.BigQueryWrite return nil, nil, err } } + // The channel relationship with its ARC is 1:1. If we get a new ARC, create a new pending // write channel and fire up the associated receive processor. The channel ensures that // responses for a connection are processed in the same order that appends were sent. @@ -159,7 +160,7 @@ func (cp *connectionPool) openWithRetry(co *connection) (storagepb.BigQueryWrite depth = d } ch := make(chan *pendingWrite, depth) - go connRecvProcessor(co, arc, ch) + go connRecvProcessor(co.ctx, co, arc, ch) return arc, ch, nil } } @@ -441,13 +442,17 @@ func (co *connection) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient, f if arc != co.arc && !forceReconnect { return co.arc, co.pending, nil } - // We need to (re)open a connection. Cleanup previous connection and channel if they are present. + // We need to (re)open a connection. Cleanup previous connection, channel, and context if they are present. if co.arc != nil && (*co.arc) != (storagepb.BigQueryWrite_AppendRowsClient)(nil) { (*co.arc).CloseSend() } if co.pending != nil { close(co.pending) } + if co.cancel != nil { + co.cancel() + co.ctx, co.cancel = context.WithCancel(co.pool.ctx) + } co.arc = new(storagepb.BigQueryWrite_AppendRowsClient) // We're going to (re)open the connection, so clear any optimizer state. @@ -464,10 +469,10 @@ type streamClientFunc func(context.Context, ...gax.CallOption) (storagepb.BigQue // connRecvProcessor is used to propagate append responses back up with the originating write requests. It // It runs as a goroutine. A connection object allows for reconnection, and each reconnection establishes a new // processing gorouting and backing channel. -func connRecvProcessor(co *connection, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) { +func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) { for { select { - case <-co.ctx.Done(): + case <-ctx.Done(): // Context is done, so we're not going to get further updates. Mark all work left in the channel // with the context error. We don't attempt to re-enqueue in this case. for { @@ -478,7 +483,7 @@ func connRecvProcessor(co *connection, arc storagepb.BigQueryWrite_AppendRowsCli // It's unlikely this connection will recover here, but for correctness keep the flow controller // state correct by releasing. co.release(pw) - pw.markDone(nil, co.ctx.Err()) + pw.markDone(nil, ctx.Err()) } case nextWrite, ok := <-ch: if !ok { @@ -493,12 +498,12 @@ func connRecvProcessor(co *connection, arc storagepb.BigQueryWrite_AppendRowsCli continue } // Record that we did in fact get a response from the backend. - recordStat(co.ctx, AppendResponses, 1) + recordStat(ctx, AppendResponses, 1) if status := resp.GetError(); status != nil { // The response from the backend embedded a status error. We record that the error // occurred, and tag it based on the response code of the status. - if tagCtx, tagErr := tag.New(co.ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil { + if tagCtx, tagErr := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil { recordStat(tagCtx, AppendResponseErrors, 1) } respErr := grpcstatus.ErrorProto(status) diff --git a/bigquery/storage/managedwriter/connection_test.go b/bigquery/storage/managedwriter/connection_test.go index fcc6cbc472f1..8010afc70b33 100644 --- a/bigquery/storage/managedwriter/connection_test.go +++ b/bigquery/storage/managedwriter/connection_test.go @@ -61,7 +61,7 @@ func TestConnection_OpenWithRetry(t *testing.T) { for _, tc := range testCases { pool := &connectionPool{ ctx: context.Background(), - open: func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { + open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { if len(tc.errors) == 0 { panic("out of errors") } @@ -162,12 +162,12 @@ func TestConnectionPool_OpenCallOptionPropagation(t *testing.T) { pool := &connectionPool{ ctx: ctx, cancel: cancel, - open: createOpenF(ctx, func(ctx context.Context, opts ...gax.CallOption) (storage.BigQueryWrite_AppendRowsClient, error) { + open: createOpenF(func(ctx context.Context, opts ...gax.CallOption) (storage.BigQueryWrite_AppendRowsClient, error) { if len(opts) == 0 { t.Fatalf("no options were propagated") } return nil, fmt.Errorf("no real client") - }), + }, ""), callOptions: []gax.CallOption{ gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(99)), }, diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 7833f300ff7e..287a94ce188f 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -1393,22 +1393,21 @@ func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client, } func TestIntegration_MultiplexWrites(t *testing.T) { - mwClient, bqClient := getTestClients(context.Background(), t, + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + mwClient, bqClient := getTestClients(ctx, t, WithMultiplexing(), WithMultiplexPoolLimit(2), ) defer mwClient.Close() defer bqClient.Close() - dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "us-east1") + dataset, cleanup, err := setupTestDataset(ctx, t, bqClient, "us-east1") if err != nil { t.Fatalf("failed to init test dataset: %v", err) } defer cleanup() - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - wantWrites := 10 testTables := []struct { @@ -1538,3 +1537,103 @@ func TestIntegration_MultiplexWrites(t *testing.T) { } } + +func TestIntegration_MingledContexts(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + mwClient, bqClient := getTestClients(ctx, t, + WithMultiplexing(), + WithMultiplexPoolLimit(2), + ) + defer mwClient.Close() + defer bqClient.Close() + + wantLocation := "us-east4" + + dataset, cleanup, err := setupTestDataset(ctx, t, bqClient, wantLocation) + if err != nil { + t.Fatalf("failed to init test dataset: %v", err) + } + defer cleanup() + + testTable := dataset.Table(tableIDs.New()) + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + + m := &testdata.SimpleMessageProto2{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + numWriters := 4 + contexts := make([]context.Context, numWriters) + cancels := make([]context.CancelFunc, numWriters) + writers := make([]*ManagedStream, numWriters) + for i := 0; i < numWriters; i++ { + contexts[i], cancels[i] = context.WithCancel(ctx) + ms, err := mwClient.NewManagedStream(contexts[i], + WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(DefaultStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("instantating writer %d failed: %v", i, err) + } + writers[i] = ms + } + + sampleRow, err := proto.Marshal(&testdata.SimpleMessageProto2{ + Name: proto.String("datafield"), + Value: proto.Int64(1234), + }) + if err != nil { + t.Fatalf("failed to generate sample row") + } + + for i := 0; i < numWriters; i++ { + res, err := writers[i].AppendRows(contexts[i], [][]byte{sampleRow}) + if err != nil { + t.Errorf("initial write on %d failed: %v", i, err) + } else { + if _, err := res.GetResult(contexts[i]); err != nil { + t.Errorf("GetResult initial write %d: %v", i, err) + } + } + } + + // cancel the first context + cancels[0]() + // repeat writes on all other writers with the second context + for i := 1; i < numWriters; i++ { + res, err := writers[i].AppendRows(contexts[i], [][]byte{sampleRow}) + if err != nil { + t.Errorf("second write on %d failed: %v", i, err) + } else { + if _, err := res.GetResult(contexts[1]); err != nil { + t.Errorf("GetResult err on second write %d: %v", i, err) + } + } + } + + // check that writes to the first writer should fail, even with a valid request context. + if _, err := writers[0].AppendRows(contexts[1], [][]byte{sampleRow}); err == nil { + t.Errorf("write succeeded on first writer when it should have failed") + } + + // cancel the second context as well, ensure writer created with good context and bad request context fails + cancels[1]() + if _, err := writers[2].AppendRows(contexts[1], [][]byte{sampleRow}); err == nil { + t.Errorf("write succeeded on third writer with a bad request context") + } + + // repeat writes on remaining good writers/contexts + for i := 2; i < numWriters; i++ { + res, err := writers[i].AppendRows(contexts[i], [][]byte{sampleRow}) + if err != nil { + t.Errorf("second write on %d failed: %v", i, err) + } else { + if _, err := res.GetResult(contexts[i]); err != nil { + t.Errorf("GetResult err on second write %d: %v", i, err) + } + } + } +} diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 6c1a69372a5c..50bae032ab7a 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -84,7 +84,7 @@ type ManagedStream struct { // writer state mu sync.Mutex - ctx context.Context // used solely for stats/instrumentation. + ctx context.Context // used for stats/instrumentation, and to check the writer is live. cancel context.CancelFunc err error // retains any terminal error (writer was closed) } @@ -196,6 +196,11 @@ func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) ( // attached to the pendingWrite. func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOption) error { for { + ms.mu.Lock() + if ms.err != nil { + return ms.err + } + ms.mu.Unlock() conn, err := ms.pool.selectConn(pw) if err != nil { pw.markDone(nil, err) @@ -284,6 +289,12 @@ func (ms *ManagedStream) buildRequest(data [][]byte) *storagepb.AppendRowsReques // The size of a single request must be less than 10 MB in size. // Requests larger than this return an error, typically `INVALID_ARGUMENT`. func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...AppendOption) (*AppendResult, error) { + // before we do anything, ensure the writer isn't closed. + ms.mu.Lock() + if ms.err != nil { + return nil, ms.err + } + ms.mu.Unlock() // Ensure we build the request and pending write with a consistent schema version. curSchemaVersion := ms.curDescVersion req := ms.buildRequest(data) @@ -301,6 +312,7 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ... select { case errCh <- ms.appendWithRetry(pw): case <-ctx.Done(): + case <-ms.ctx.Done(): } close(errCh) }() @@ -313,6 +325,17 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ... // This API expresses request idempotency through offset management, so users who care to use offsets // can deal with the dropped request. return nil, ctx.Err() + case <-ms.ctx.Done(): + // Same as the request context being done, this indicates the writer context expired. For this case, + // we also attempt to close the writer. + ms.mu.Lock() + if ms.err == nil { + ms.err = ms.ctx.Err() + } + ms.mu.Unlock() + ms.Close() + // Don't relock to fetch the writer terminal error, as we've already ensured that the writer is closed. + return nil, ms.err case appendErr = <-errCh: if appendErr != nil { return nil, appendErr diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index 6b44e298634c..d05446b1bac9 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -57,7 +57,7 @@ func (tarc *testAppendRowsClient) CloseSend() error { } // openTestArc handles wiring in a test AppendRowsClient into a managedstream by providing the open function. -func openTestArc(testARC *testAppendRowsClient, sendF func(req *storagepb.AppendRowsRequest) error, recvF func() (*storagepb.AppendRowsResponse, error)) func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { +func openTestArc(testARC *testAppendRowsClient, sendF func(req *storagepb.AppendRowsRequest) error, recvF func() (*storagepb.AppendRowsResponse, error)) func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { sF := func(req *storagepb.AppendRowsRequest) error { testARC.requests = append(testARC.requests, req) return nil @@ -78,8 +78,12 @@ func openTestArc(testARC *testAppendRowsClient, sendF func(req *storagepb.Append testARC.closeF = func() error { return nil } - return func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { + return func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { testARC.openCount = testARC.openCount + 1 + // Simulate grpc finalizer goroutine + go func() { + <-ctx.Done() + }() return testARC, nil } } @@ -373,14 +377,14 @@ func TestManagedStream_AppendDeadlocks(t *testing.T) { openF := openTestArc(&testAppendRowsClient{}, nil, nil) pool := &connectionPool{ ctx: ctx, - open: func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { + open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { if len(tc.openErrors) == 0 { panic("out of open errors") } curErr := tc.openErrors[0] tc.openErrors = tc.openErrors[1:] if curErr == nil { - return openF(opts...) + return openF(ctx, opts...) } return nil, curErr }, @@ -467,6 +471,70 @@ func TestManagedStream_LeakingGoroutines(t *testing.T) { } } +func TestManagedStream_LeakingGoroutinesReconnect(t *testing.T) { + ctx := context.Background() + + reqCount := 0 + testArc := &testAppendRowsClient{} + pool := &connectionPool{ + ctx: ctx, + open: openTestArc(testArc, + func(req *storagepb.AppendRowsRequest) error { + reqCount++ + if reqCount%2 == 1 { + return io.EOF + } + return nil + }, nil), + baseFlowController: newFlowController(1000, 0), + } + if err := pool.activateRouter(newSimpleRouter("")); err != nil { + t.Errorf("activateRouter: %v", err) + } + ms := &ManagedStream{ + id: "foo", + ctx: ctx, + streamSettings: defaultStreamSettings(), + retry: newStatelessRetryer(), + } + ms.retry.maxAttempts = 4 + ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{}) + if err := pool.addWriter(ms); err != nil { + t.Errorf("addWriter: %v", err) + } + + fakeData := [][]byte{ + []byte("foo"), + } + + threshold := runtime.NumGoroutine() + 5 + + // Send a bunch of appends that will trigger reconnects and monitor that + // goroutine growth stays within bounded threshold. + for i := 0; i < 100; i++ { + writeCtx := context.Background() + r, err := ms.AppendRows(writeCtx, fakeData) + if err != nil { + t.Fatalf("failed to append row: %v", err) + } + _, err = r.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result: %v", err) + } + if r.totalAttempts != 2 { + t.Fatalf("should trigger a retry, but found: %d attempts", r.totalAttempts) + } + if testArc.openCount != i+2 { + t.Errorf("should trigger a reconnect, but found openCount %d", testArc.openCount) + } + if i%10 == 0 { + if current := runtime.NumGoroutine(); current > threshold { + t.Errorf("potential goroutine leak, append %d: current %d, threshold %d", i, current, threshold) + } + } + } +} + func TestManagedWriter_CancellationDuringRetry(t *testing.T) { // Issue: double close of pending write. // https://github.com/googleapis/google-cloud-go/issues/7380 diff --git a/bigquery/storage/managedwriter/routers_test.go b/bigquery/storage/managedwriter/routers_test.go index baf209a09aa6..f403e3671519 100644 --- a/bigquery/storage/managedwriter/routers_test.go +++ b/bigquery/storage/managedwriter/routers_test.go @@ -30,7 +30,7 @@ func TestSimpleRouter(t *testing.T) { pool := &connectionPool{ ctx: ctx, - open: func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { + open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { return &testAppendRowsClient{}, nil }, } @@ -74,7 +74,7 @@ func TestSharedRouter_Basic(t *testing.T) { pool := &connectionPool{ ctx: ctx, cancel: cancel, - open: func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { + open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { return &testAppendRowsClient{}, nil }, } @@ -123,7 +123,7 @@ func TestSharedRouter_Multiplex(t *testing.T) { id: newUUID(poolIDPrefix), ctx: ctx, cancel: cancel, - open: func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { + open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { return &testAppendRowsClient{}, nil }, baseFlowController: newFlowController(2, 10), @@ -282,7 +282,7 @@ func BenchmarkRoutingParallel(b *testing.B) { pool := &connectionPool{ ctx: ctx, cancel: cancel, - open: func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { + open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { return &testAppendRowsClient{}, nil }, } @@ -429,7 +429,7 @@ func BenchmarkWatchdogPulse(b *testing.B) { pool := &connectionPool{ ctx: ctx, cancel: cancel, - open: func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { + open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { return &testAppendRowsClient{}, nil }, baseFlowController: newFlowController(maxFlowInserts, maxFlowBytes),