-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-11104] Add code snippet for Go SDK Self-Checkpointing #17956
Conversation
Adds small code snippet example to the Beam Programming Guide that demonstrates self-checkpointing behavior in Beam Go.
Can one of the admins verify this patch? |
1 similar comment
Can one of the admins verify this patch? |
@@ -6422,7 +6422,26 @@ resource utilization. | |||
{{< /highlight >}} | |||
|
|||
{{< highlight go >}} | |||
This is not supported yet, see BEAM-11104. | |||
func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit func(Record)) sdf.ProcessContinuation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we put this in the snippets folder (example below in Watermark estimation section)? I know we haven't been clean on that before, but it:
(a) makes sure that the code actually compiles
(b) makes it easier to reuse (e.g. I know Dataflow has docs that use snippets from Beam)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a completely fictional IO since we don't actually have a robust native streaming IO, so there's nothing to compile. It's just modeled after the Python and Java versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - I don't care if we make up empty functions for that, we already do that for a number of the existing snippets. The process continuation stuff should compile though, and that's the important bit anyways.
@@ -6422,7 +6422,26 @@ resource utilization. | |||
{{< /highlight >}} | |||
|
|||
{{< highlight go >}} | |||
This is not supported yet, see BEAM-11104. | |||
func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit func(Record)) sdf.ProcessContinuation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you return an err
parameter as well (it can just return nil)? Something I realized w/ Bundle Finalization is that its much more helpful if we provide the parameters that surround the one we are demonstrating because it allows users to see the ordering we require.
Side note unrelated to this PR: We probably need better ordering error messages, they are pretty confusing right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually also curious about what process continuation we should return when we return an err response actually - is it nil? Might be worth including that as an option if for example, records, err := fn.ExternalService.readNextRecords(position)
returns a non-nil, non-throttling error respone
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC if an SDF signature has a ProcessContinuation return we always expect either a Resume() or Stop() continuation and never a nil.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding an error parameter is reasonable, so that's added. It did lead to some extra error checking overhead since we don't have the try-catch mechanism the other SDKs leverage but that's not a huge problem.
return sdf.ResumeProcessingIn(60 * time.Seconds) | ||
} | ||
if len(records) == 0 { | ||
return sdf.ResumeProcessingIn(10 * time.Seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment along the lines of // Wait for data to be available
? Might be nice to have a similar comment for the throttling case and the finish execution case as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a fair note. Adding clarifying comments is always good for a documentation snippet
Assigning reviewers. If you would like to opt out of this review, comment R: @riteshghorse for label go. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - thanks!
R: @lostluck |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
Adds small code snippet example to the Beam Programming Guide that demonstrates self-checkpointing behavior in Beam Go.
Rendering
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.