-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle invalid rows in the Storage Api sink #17423
Conversation
Codecov Report
@@ Coverage Diff @@
## master #17423 +/- ##
==========================================
+ Coverage 73.98% 74.00% +0.01%
==========================================
Files 696 696
Lines 91851 91851
==========================================
+ Hits 67958 67975 +17
+ Misses 22644 22627 -17
Partials 1249 1249
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
@reuvenlax - Is this ready for a review? |
8513761
to
4b71441
Compare
@aaltay this is now ready for review. who would be the best reviewer for this? |
friendly ping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
StorageApiWritePayload payload = messageConverter.toMessage(element.getValue()); | ||
o.get(successfulWritesTag).output(KV.of(element.getKey(), payload)); | ||
} catch (TableRowToStorageApiProto.SchemaConversionException e) { | ||
TableRow tableRow = messageConverter.toTableRow(element.getValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm bit worried about just pushing all messages from an exception handler to a DLQ.
(1) This could result in errors from downstream fused steps being sent to DQL instead of being retried.
(2)Messages being send to a DLQ in an unintended way may be perceived as dataloss by a user of the I/O connector.
I think we should build a retry policy around this (or use existing BQ retry policy) so that users explicitly mark messages that should be sent to a DLQ.
WDYT ?
We're not pushing all errors, just the SchemaConversionExceptions which are
thrown when converting the json to a proto. If a downstream ParDo threw our
internal SchemaConversionException, that would be a very weird thing to do
(we could make it package private to ensure this can't happen).
…On Fri, May 20, 2022 at 7:11 PM Chamikara Jayalath ***@***.***> wrote:
***@***.**** commented on this pull request.
Thanks.
------------------------------
In
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java
<#17423 (comment)>:
> throws Exception {
dynamicDestinations.setSideInputAccessorFromProcessContext(c);
MessageConverter<ElementT> messageConverter =
messageConverters.get(
element.getKey(), dynamicDestinations, getDatasetService(pipelineOptions));
- StorageApiWritePayload payload = messageConverter.toMessage(element.getValue());
- o.output(KV.of(element.getKey(), payload));
+ try {
+ StorageApiWritePayload payload = messageConverter.toMessage(element.getValue());
+ o.get(successfulWritesTag).output(KV.of(element.getKey(), payload));
+ } catch (TableRowToStorageApiProto.SchemaConversionException e) {
+ TableRow tableRow = messageConverter.toTableRow(element.getValue());
I'm bit worried about just pushing all messages from an exception handler
to a DLQ.
(1) This could result in errors from downstream fused steps being sent to
DQL instead of being retried.
(2)Messages being send to a DLQ in an unintended way may be perceived as
dataloss by a user of the I/O connector.
I think we should build a retry policy around this (or use existing BQ
retry policy) so that users explicitly mark messages that should be sent to
a DLQ.
WDYT ?
—
Reply to this email directly, view it on GitHub
<#17423 (review)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AFAYJVKKQVNK44CGTVGR2C3VLAL3VANCNFSM5T6AVIFA>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Can we build a retry policy that only include "SchemaConversionExceptions" by default ? And can we expose this through the API similar to existing failedInsertRetryPolicy ? Line 2340 in 0c9cf43
|
I'm not sure I understand. Retry _never_ makes sense here. If we failed to
convert once, we will continue to fail to convert. The Storage Write API
does not currently return per-row errors, however that is being added.
However the plan is for it to only return errors for rows that cannot be
inserted at all (e.g. the row is larger than allowed, a field does not
match a schema constraint, etc.). A retry policy won't make sense there
either, since such a failure means that the insert will continue to fail no
matter how many times we retry it.
The failedInsertRetryPolicy made sense for the old InsertAll API, since
that API would return back errors that were retryable, and the user needed
to specify sometimes whether to retry or not (though that API was always
incomplete, since users often wanted to specify a maximum number of retries
which was not supported). Here I don't think it really makes sense.
…On Fri, May 20, 2022 at 7:47 PM Chamikara Jayalath ***@***.***> wrote:
Can we build a retry policy that only include "SchemaConversionExceptions"
by default ?
Also, can we implement something similar to the failedInsertRetryPolicy
for the Storage Write API ?
https://github.com/apache/beam/blob/0c9cf43a7edae2e2a2622a8f4241b64a638121bb/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2340
—
Reply to this email directly, view it on GitHub
<#17423 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AFAYJVOFEXS5RKSF4OQDQ63VLAQBLANCNFSM5T6AVIFA>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Ok. Thanks for clarifying. LGTM. +1 for making SchemaConversionException package private. |
Run Java PreCommit |
Run Kotlin_Examples PreCommit |
Run Java_Examples_Dataflow PreCommit |
Run Java PreCommit |
2 similar comments
Run Java PreCommit |
Run Java PreCommit |
3e0949b
to
c4af119
Compare
Run Java PreCommit |
4 similar comments
Run Java PreCommit |
Run Java PreCommit |
Run Java PreCommit |
Run Java PreCommit |
Run Java PreCommit |
2 similar comments
Run Java PreCommit |
Run Java PreCommit |
This seems to be in direction of solving BEAM-13158 issue. Maybe issue and PR should be linked? Will there be other PR's regarding row-level error handling via Storage API? |
This handles the case where there are "obvious" schema incompatibilities
(e.g. wrong fields names, wrong types, etc.). We want to do better, but
that will require support from BigQuery, which should be coming.
…On Thu, Jun 2, 2022 at 7:14 AM Algirdas Kazla ***@***.***> wrote:
This seems to be in direction of solving BEAM-13158
<https://issues.apache.org/jira/browse/BEAM-13158> issue. Maybe issue and
PR should be linked? Will there be other PR's regarding row-level error
handling via Storage API?
—
Reply to this email directly, view it on GitHub
<#17423 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AFAYJVLW64UEEDQNIMQTHILVNC6SXANCNFSM5T6AVIFA>
.
You are receiving this because you modified the open/close state.Message
ID: ***@***.***>
|
No description provided.