Skip to content

Commit

Permalink
[BEAM-14505] Add Dataflow streaming pipeline update support to the Go…
Browse files Browse the repository at this point in the history
… SDK (#17747)
  • Loading branch information
jrmccluskey authored May 26, 2022
1 parent 075757d commit 92b8dc7
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 4 deletions.
17 changes: 17 additions & 0 deletions sdks/go/pkg/beam/runners/dataflow/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ var (
// TODO(BEAM-14512) Turn this on once TO_STRING is implemented
// enableHotKeyLogging = flag.Bool("enable_hot_key_logging", false, "Specifies that when a hot key is detected in the pipeline, the literal, human-readable key is printed in the user's Cloud Logging project (optional).")

// Streaming update flags
update = flag.Bool("update", false, "Submit this job as an update to an existing Dataflow job (optional); the job name must match the existing job to update")
transformMapping = flag.String("transform_name_mapping", "", "JSON-formatted mapping of old transform names to new transform names for pipeline updates (optional)")

dryRun = flag.Bool("dry_run", false, "Dry run. Just print the job, but don't submit it.")
teardownPolicy = flag.String("teardown_policy", "", "Job teardown policy (internal only).")

Expand Down Expand Up @@ -119,6 +123,8 @@ var flagFilter = map[string]bool{
"teardown_policy": true,
"cpu_profiling": true,
"session_recording": true,
"update": true,
"transform_name_mapping": true,

// Job Options flags
"endpoint": true,
Expand Down Expand Up @@ -256,6 +262,15 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
return nil, errors.Errorf("invalid flex resource scheduling goal. Got %q; Use --flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)", *flexRSGoal)
}
}
if !*update && *transformMapping != "" {
return nil, errors.New("provided transform_name_mapping without setting the --update flag, so the pipeline would not be updated")
}
var updateTransformMapping map[string]string
if *transformMapping != "" {
if err := json.Unmarshal([]byte(*transformMapping), &updateTransformMapping); err != nil {
return nil, errors.Wrapf(err, "error reading --transform_name_mapping flag as JSON")
}
}

hooks.SerializeHooksToOptions()

Expand Down Expand Up @@ -317,6 +332,8 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
WorkerZone: *workerZone,
TeardownPolicy: *teardownPolicy,
ContainerImage: getContainerImage(ctx),
Update: *update,
TransformNameMapping: updateTransformMapping,
}
if opts.TempLocation == "" {
opts.TempLocation = gcsx.Join(*stagingLocation, "tmp")
Expand Down
78 changes: 78 additions & 0 deletions sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,81 @@ func TestGetJobOptions_DockerNoImage(t *testing.T) {
t.Fatalf("getContainerImage() = %q, want %q", got, want)
}
}

func TestGetJobOptions_TransformMapping(t *testing.T) {
*labels = `{"label1": "val1", "label2": "val2"}`
*stagingLocation = "gs://testStagingLocation"
*autoscalingAlgorithm = "NONE"
*minCPUPlatform = "testPlatform"
*flexRSGoal = "FLEXRS_SPEED_OPTIMIZED"

*gcpopts.Project = "testProject"
*gcpopts.Region = "testRegion"

*jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
*jobopts.JobName = "testJob"

*update = true
*transformMapping = `{"transformOne": "transformTwo"}`
opts, err := getJobOptions(context.Background())
if err != nil {
t.Errorf("getJobOptions() returned error, got %v", err)
}
if opts == nil {
t.Fatal("getJobOptions() got nil, want struct")
}
if got, ok := opts.TransformNameMapping["transformOne"]; !ok || got != "transformTwo" {
t.Errorf("mismatch in transform mapping got %v, want %v", got, "transformTwo")
}

}

func TestGetJobOptions_TransformMappingNoUpdate(t *testing.T) {
*labels = `{"label1": "val1", "label2": "val2"}`
*stagingLocation = "gs://testStagingLocation"
*autoscalingAlgorithm = "NONE"
*minCPUPlatform = "testPlatform"
*flexRSGoal = "FLEXRS_SPEED_OPTIMIZED"

*gcpopts.Project = "testProject"
*gcpopts.Region = "testRegion"

*jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
*jobopts.JobName = "testJob"

*update = false
*transformMapping = `{"transformOne": "transformTwo"}`

opts, err := getJobOptions(context.Background())
if err == nil {
t.Error("getJobOptions() returned error nil, want an error")
}
if opts != nil {
t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts)
}
}

func TestGetJobOptions_InvalidMapping(t *testing.T) {
*labels = `{"label1": "val1", "label2": "val2"}`
*stagingLocation = "gs://testStagingLocation"
*autoscalingAlgorithm = "NONE"
*minCPUPlatform = "testPlatform"
*flexRSGoal = "FLEXRS_SPEED_OPTIMIZED"

*gcpopts.Project = "testProject"
*gcpopts.Region = "testRegion"

*jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
*jobopts.JobName = "testJob"

*update = true
*transformMapping = "not a JSON-encoded string"

opts, err := getJobOptions(context.Background())
if err == nil {
t.Error("getJobOptions() returned error nil, want an error")
}
if opts != nil {
t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts)
}
}
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker
if err != nil {
return presult, err
}
upd, err := Submit(ctx, client, opts.Project, opts.Region, job)
upd, err := Submit(ctx, client, opts.Project, opts.Region, job, opts.Update)
// When in async mode, if we get a 409 because we've already submitted an actively running job with the same name
// just return the existing job as a convenience
if gErr, ok := err.(*googleapi.Error); async && ok && gErr.Code == 409 {
Expand Down
18 changes: 15 additions & 3 deletions sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ type JobOptions struct {
FlexRSGoal string
EnableHotKeyLogging bool

// Streaming update settings
Update bool
TransformNameMapping map[string]string

// Autoscaling settings
Algorithm string
MaxNumWorkers int64
Expand Down Expand Up @@ -208,8 +212,9 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker
TempStoragePrefix: opts.TempLocation,
Experiments: experiments,
},
Labels: opts.Labels,
Steps: steps,
Labels: opts.Labels,
TransformNameMapping: opts.TransformNameMapping,
Steps: steps,
}

workerPool := job.Environment.WorkerPools[0]
Expand Down Expand Up @@ -238,7 +243,14 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker
}

// Submit submits a prepared job to Cloud Dataflow.
func Submit(ctx context.Context, client *df.Service, project, region string, job *df.Job) (*df.Job, error) {
func Submit(ctx context.Context, client *df.Service, project, region string, job *df.Job, updateJob bool) (*df.Job, error) {
if updateJob {
runningJob, err := GetRunningJobByName(client, project, region, job.Name)
if err != nil {
return nil, err
}
job.ReplaceJobId = runningJob.Id
}
upd, err := client.Projects.Locations.Jobs.Create(project, region, job).Do()
if err == nil {
log.Infof(ctx, "Submitted job: %v", upd.Id)
Expand Down

0 comments on commit 92b8dc7

Please sign in to comment.