-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Async DoFn to emit element with restored metadata #5441
Conversation
OutputReceiver<KV<Input, TryWrapper>> out, | ||
BoundedWindow window) { | ||
inputCount++; | ||
flush(r -> out.output(KV.of(r.input, r.output))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks we were outputing using the current element's metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm after rebasing the bigtable changes :)
9a818b1
to
fadba86
Compare
fadba86
to
f6b694f
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5441 +/- ##
==========================================
+ Coverage 61.29% 61.31% +0.02%
==========================================
Files 311 312 +1
Lines 11062 11068 +6
Branches 774 792 +18
==========================================
+ Hits 6780 6786 +6
Misses 4282 4282 ☔ View full report in Codecov by Sentry. |
@clairemcginty updated the PR. All async DoFn are affected |
@@ -155,40 +159,58 @@ public void startBundle(StartBundleContext context) { | |||
semaphore.release(maxPendingRequests); | |||
} | |||
|
|||
// kept for binary compatibility. Must not be used | |||
// TODO: remove in 0.15.0 | |||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this deprecation warning get logged on the user's side? I guess only if they're pulling in an older Beam version that uses the old API, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only if extending the class and overriding this method which should never happen.
However a library compiled with the old API will call this method, so I've left it for now.
@@ -25,33 +25,33 @@ import scala.util.{Failure, Success, Try} | |||
/** | |||
* A [[org.apache.beam.sdk.transforms.DoFn DoFn]] that performs asynchronous lookup using the | |||
* provided client for Scala [[Future]]. | |||
* @tparam A | |||
* @tparam Input |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice refactor here 👍 much clearer
scio-core/src/test/scala/com/spotify/scio/transforms/AsyncBatchLookupDoFnTest.scala
Outdated
Show resolved
Hide resolved
3872ae7
to
2825c6d
Compare
flush
call from theprocessElement
was not propagating the original element's window and timestamp.