From 82debe846a62cf869b7dc1d2f0a942434c31a9d4 Mon Sep 17 00:00:00 2001 From: Jack McCluskey <34928439+jrmccluskey@users.noreply.github.com> Date: Fri, 3 Jun 2022 11:59:01 -0400 Subject: [PATCH 1/4] [BEAM-11104] Add code snippet for Go SDK Self-Checkpointing Adds small code snippet example to the Beam Programming Guide that demonstrates self-checkpointing behavior in Beam Go. --- .../en/documentation/programming-guide.md | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index ec2f4fda88a94..cac80e0b15b50 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -6422,7 +6422,26 @@ resource utilization. {{< /highlight >}} {{< highlight go >}} -This is not supported yet, see BEAM-11104. +func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit func(Record)) sdf.ProcessContinuation { + position := rt.GetRestriction().(offsetrange.Restriction).Start + for { + records, err := fn.ExternalService.readNextRecords(position) + if err == fn.ExternalService.ThrottlingErr { + return sdf.ResumeProcessingIn(60 * time.Seconds) + } + if len(records) == 0 { + return sdf.ResumeProcessingIn(10 * time.Seconds) + } + for _, record := range records { + if !rt.TryClaim(position) { + return sdf.StopProcessing() + } + position += 1 + + emit(record) + } + } +} {{< /highlight >}} ### 12.4. Runner-initiated split {#runner-initiated-split} From 268ae5b80614e528a0eb7df6ea9d2c105c86ea8b Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 3 Jun 2022 13:39:33 -0400 Subject: [PATCH 2/4] Add error return to help with ordering clarity --- .../en/documentation/programming-guide.md | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index cac80e0b15b50..a438b5e10fc76 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -6422,19 +6422,26 @@ resource utilization. {{< /highlight >}} {{< highlight go >}} -func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit func(Record)) sdf.ProcessContinuation { +func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit func(Record)) (sdf.ProcessContinuation, error) { position := rt.GetRestriction().(offsetrange.Restriction).Start for { records, err := fn.ExternalService.readNextRecords(position) - if err == fn.ExternalService.ThrottlingErr { - return sdf.ResumeProcessingIn(60 * time.Seconds) + + if err != nil { + if err == fn.ExternalService.ThrottlingErr { + // Resume at a later time to avoid throttling. + return sdf.ResumeProcessingIn(60 * time.Seconds), nil + } + return sdf.StopProcessing(), err } + if len(records) == 0 { - return sdf.ResumeProcessingIn(10 * time.Seconds) + // Wait for data to be available. + return sdf.ResumeProcessingIn(10 * time.Seconds), nil } for _, record := range records { if !rt.TryClaim(position) { - return sdf.StopProcessing() + return sdf.StopProcessing(), nil } position += 1 From c469ac720a9fccdbe529081c6fb8a5796a0a820a Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 3 Jun 2022 13:40:31 -0400 Subject: [PATCH 3/4] Add a finish processing comment --- website/www/site/content/en/documentation/programming-guide.md | 1 + 1 file changed, 1 insertion(+) diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index a438b5e10fc76..73b833a79b445 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -6441,6 +6441,7 @@ func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit func(Record) } for _, record := range records { if !rt.TryClaim(position) { + // Records have been claimed, finish processing. return sdf.StopProcessing(), nil } position += 1 From 55e67ac609ad2e324ef0bbfcd3ee9a034ef4a414 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 3 Jun 2022 14:40:40 -0400 Subject: [PATCH 4/4] Move code to snippet correctly --- .../go/examples/snippets/12splittabledofns.go | 69 +++++++++++++++++++ .../en/documentation/programming-guide.md | 29 +------- 2 files changed, 70 insertions(+), 28 deletions(-) create mode 100644 sdks/go/examples/snippets/12splittabledofns.go diff --git a/sdks/go/examples/snippets/12splittabledofns.go b/sdks/go/examples/snippets/12splittabledofns.go new file mode 100644 index 0000000000000..a903881c7491e --- /dev/null +++ b/sdks/go/examples/snippets/12splittabledofns.go @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snippets + +import ( + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" +) + +type Record struct{} + +type SomeService struct { + ThrottlingErr error +} + +func (s *SomeService) readNextRecords(position interface{}) ([]Record, error) { + return []Record{}, nil +} + +type checkpointingSplittableDoFn struct { + ExternalService SomeService +} + +// [START self_checkpoint] +func (fn *checkpointingSplittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit func(Record)) (sdf.ProcessContinuation, error) { + position := rt.GetRestriction().(offsetrange.Restriction).Start + for { + records, err := fn.ExternalService.readNextRecords(position) + + if err != nil { + if err == fn.ExternalService.ThrottlingErr { + // Resume at a later time to avoid throttling. + return sdf.ResumeProcessingIn(60 * time.Second), nil + } + return sdf.StopProcessing(), err + } + + if len(records) == 0 { + // Wait for data to be available. + return sdf.ResumeProcessingIn(10 * time.Second), nil + } + for _, record := range records { + if !rt.TryClaim(position) { + // Records have been claimed, finish processing. + return sdf.StopProcessing(), nil + } + position += 1 + + emit(record) + } + } +} + +// [END self_checkpoint] diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 73b833a79b445..002974c24b1ff 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -6422,34 +6422,7 @@ resource utilization. {{< /highlight >}} {{< highlight go >}} -func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit func(Record)) (sdf.ProcessContinuation, error) { - position := rt.GetRestriction().(offsetrange.Restriction).Start - for { - records, err := fn.ExternalService.readNextRecords(position) - - if err != nil { - if err == fn.ExternalService.ThrottlingErr { - // Resume at a later time to avoid throttling. - return sdf.ResumeProcessingIn(60 * time.Seconds), nil - } - return sdf.StopProcessing(), err - } - - if len(records) == 0 { - // Wait for data to be available. - return sdf.ResumeProcessingIn(10 * time.Seconds), nil - } - for _, record := range records { - if !rt.TryClaim(position) { - // Records have been claimed, finish processing. - return sdf.StopProcessing(), nil - } - position += 1 - - emit(record) - } - } -} +{{< code_sample "sdks/go/examples/snippets/12splittabledofns.go" self_checkpoint>}} {{< /highlight >}} ### 12.4. Runner-initiated split {#runner-initiated-split}