Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(bigquery/storage/managedwriter): internal refactor (flow controller, ids) #7104

Merged
merged 3 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions bigquery/storage/managedwriter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"cloud.google.com/go/bigquery/internal"
storage "cloud.google.com/go/bigquery/storage/apiv1"
"cloud.google.com/go/internal/detect"
"github.com/google/uuid"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
Expand All @@ -38,6 +39,8 @@ import (
// does not have the project ID encoded.
const DetectProjectID = "*detect-project-id*"

const managedstreamIDPrefix = "managedstream"

// Client is a managed BigQuery Storage write client scoped to a single project.
type Client struct {
rawClient *storage.BigQueryWriteClient
Expand Down Expand Up @@ -106,6 +109,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
ctx, cancel := context.WithCancel(ctx)

ms := &ManagedStream{
id: newUUID(managedstreamIDPrefix),
streamSettings: defaultStreamSettings(),
c: c,
ctx: ctx,
Expand Down Expand Up @@ -232,3 +236,9 @@ func TableParentFromStreamName(streamName string) string {
func TableParentFromParts(projectID, datasetID, tableID string) string {
return fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID)
}

// newUUID simplifies generating UUIDs for internal resources.
func newUUID(prefix string) string {
id := uuid.New()
return fmt.Sprintf("%s_%s", prefix, id.String())
}
11 changes: 11 additions & 0 deletions bigquery/storage/managedwriter/flow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ func newFlowController(maxInserts, maxInsertBytes int) *flowController {
return fc
}

// copyFlowController is for creating a new flow controller based on
// settings from another. It does not copy flow state.
func copyFlowController(in *flowController) *flowController {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this function go to the test file as is only being used there or there are plans to use it later ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in prep of future refactors, it will be used for replicating the flow controller as we spin up new connections.

var maxInserts, maxBytes int
if in != nil {
maxInserts = in.maxInsertCount
maxBytes = in.maxInsertBytes
}
return newFlowController(maxInserts, maxBytes)
}

// acquire blocks until one insert of size bytes can proceed or ctx is done.
// It returns nil in the first case, or ctx.Err() in the second.
//
Expand Down
37 changes: 37 additions & 0 deletions bigquery/storage/managedwriter/flow_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,40 @@ func TestFlowControllerUnboundedBytes(t *testing.T) {
t.Error("got true, wanted false")
}
}

func TestCopyFlowController(t *testing.T) {
testcases := []struct {
description string
in *flowController
wantMaxRequests int
wantMaxBytes int
}{
{
description: "nil source",
wantMaxRequests: 0,
wantMaxBytes: 0,
},
{
description: "no limit",
in: newFlowController(0, 0),
wantMaxRequests: 0,
wantMaxBytes: 0,
},
{
description: "bounded",
in: newFlowController(10, 1024),
wantMaxRequests: 10,
wantMaxBytes: 1024,
},
}

for _, tc := range testcases {
fc := copyFlowController(tc.in)
if fc.maxInsertBytes != tc.wantMaxBytes {
t.Errorf("%s: max bytes mismatch, got %d want %d ", tc.description, fc.maxInsertBytes, tc.wantMaxBytes)
}
if fc.maxInsertCount != tc.wantMaxRequests {
t.Errorf("%s: max requests mismatch, got %d want %d ", tc.description, fc.maxInsertBytes, tc.wantMaxBytes)
}
}
}
3 changes: 3 additions & 0 deletions bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
if ms.id == "" {
t.Errorf("managed stream is missing ID")
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

Expand Down
3 changes: 3 additions & 0 deletions bigquery/storage/managedwriter/managed_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type {

// ManagedStream is the abstraction over a single write stream.
type ManagedStream struct {
// Unique id for the managedstream instance.
id string

streamSettings *streamSettings
schemaDescriptor *descriptorpb.DescriptorProto
destinationTable string
Expand Down