From 8c0ff9818cde8bacd6cd323fec0266da00aa07f7 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 28 Dec 2022 21:56:09 +0000 Subject: [PATCH] refactor: augment private getWriteStream with view support This PR augments the base client with fuller support for view-based resolution of GetWriteStream metadata. This PR also adds an integration test that compares behaviors between different stream types (default vs explicitly created). Towards: https://github.com/googleapis/google-cloud-go/issues/7103 --- bigquery/storage/managedwriter/client.go | 7 +- .../storage/managedwriter/integration_test.go | 92 ++++++++++++++++++- 2 files changed, 96 insertions(+), 3 deletions(-) diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index e26c4e93688e..50160aa7ee12 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -169,7 +169,7 @@ func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error { } if ms.streamSettings.streamID != "" { // User supplied a stream, we need to verify it exists. - info, err := c.getWriteStream(ctx, ms.streamSettings.streamID) + info, err := c.getWriteStream(ctx, ms.streamSettings.streamID, false) if err != nil { return fmt.Errorf("a streamname was specified, but lookup of stream failed: %v", err) } @@ -210,10 +210,13 @@ func (c *Client) CreateWriteStream(ctx context.Context, req *storagepb.CreateWri // getWriteStream returns information about a given write stream. // // It's primarily used for setup validation, and not exposed directly to end users. -func (c *Client) getWriteStream(ctx context.Context, streamName string) (*storagepb.WriteStream, error) { +func (c *Client) getWriteStream(ctx context.Context, streamName string, fullView bool) (*storagepb.WriteStream, error) { req := &storagepb.GetWriteStreamRequest{ Name: streamName, } + if fullView { + req.View = storagepb.WriteStreamView_FULL + } return c.rawClient.GetWriteStream(ctx, req) } diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 871f35120e9b..97bd74f4cd80 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -28,6 +28,7 @@ import ( "cloud.google.com/go/bigquery/storage/managedwriter/testdata" "cloud.google.com/go/internal/testutil" "cloud.google.com/go/internal/uid" + "github.com/google/go-cmp/cmp" "github.com/googleapis/gax-go/v2/apierror" "go.opencensus.io/stats/view" "google.golang.org/api/option" @@ -36,6 +37,7 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protodesc" "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/descriptorpb" "google.golang.org/protobuf/types/dynamicpb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -112,6 +114,94 @@ func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor) } +func TestIntegration_ClientGetWriteStream(t *testing.T) { + ctx := context.Background() + mwClient, bqClient := getTestClients(ctx, t) + defer mwClient.Close() + defer bqClient.Close() + + wantLocation := "us-east1" + dataset, cleanup, err := setupTestDataset(ctx, t, bqClient, wantLocation) + if err != nil { + t.Fatalf("failed to init test dataset: %v", err) + } + defer cleanup() + + testTable := dataset.Table(tableIDs.New()) + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { + t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err) + } + + apiSchema, _ := adapt.BQSchemaToStorageTableSchema(testdata.SimpleMessageSchema) + parent := TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID) + explicitStream, err := mwClient.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{ + Parent: parent, + WriteStream: &storagepb.WriteStream{ + Type: storagepb.WriteStream_PENDING, + }, + }) + if err != nil { + t.Fatalf("CreateWriteStream: %v", err) + } + + testCases := []struct { + description string + isDefault bool + streamID string + wantType storagepb.WriteStream_Type + }{ + { + description: "default", + isDefault: true, + streamID: fmt.Sprintf("%s/streams/_default", parent), + wantType: storagepb.WriteStream_COMMITTED, + }, + { + description: "explicit pending", + streamID: explicitStream.Name, + wantType: storagepb.WriteStream_PENDING, + }, + } + + for _, tc := range testCases { + for _, fullView := range []bool{false, true} { + info, err := mwClient.getWriteStream(ctx, tc.streamID, fullView) + if err != nil { + t.Errorf("%s (%T): getWriteStream failed: %v", tc.description, fullView, err) + } + if info.GetType() != tc.wantType { + t.Errorf("%s (%T): got type %d, want type %d", tc.description, fullView, info.GetType(), tc.wantType) + } + if info.GetLocation() != wantLocation { + t.Errorf("%s (%T) view: got location %s, want location %s", tc.description, fullView, info.GetLocation(), wantLocation) + } + if info.GetCommitTime() != nil { + t.Errorf("%s (%T)expected empty commit time, got %v", tc.description, fullView, info.GetCommitTime()) + } + + if !tc.isDefault { + if info.GetCreateTime() == nil { + t.Errorf("%s (%T): expected create time, was empty", tc.description, fullView) + } + } else { + if info.GetCreateTime() != nil { + t.Errorf("%s (%T): expected empty time, got %v", tc.description, fullView, info.GetCreateTime()) + } + } + + if !fullView { + if info.GetTableSchema() != nil { + t.Errorf("%s (%T) basic view: expected no schema, was populated", tc.description, fullView) + } + } else { + if diff := cmp.Diff(info.GetTableSchema(), apiSchema, protocmp.Transform()); diff != "" { + t.Errorf("%s (%T) schema mismatch: -got, +want:\n%s", tc.description, fullView, diff) + } + } + } + } +} + func TestIntegration_ManagedWriter(t *testing.T) { mwClient, bqClient := getTestClients(context.Background(), t) defer mwClient.Close() @@ -326,7 +416,7 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC t.Fatalf("NewManagedStream: %v", err) } - info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID) + info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID, false) if err != nil { t.Errorf("couldn't get stream info: %v", err) }