Skip to content

Commit

Permalink
Merge pull request apache#17517 from [BEAM-14383] Improve "FailedRows…
Browse files Browse the repository at this point in the history
…" errors returned by beam.io.WriteToBigQuery

* [BEAM-14383] - add row_errors to returned rows when inserting to BigQuery

* [BEAM-14383] add test checking proper insert of _error_records

* [BEAM-14383] run yapf on python test and add function doc

* [BEAM-14383] reorder imports

* [BEAM-14383] add a breaking change note to 2.39 CHANGES.md
  • Loading branch information
Firlej authored May 6, 2022
1 parent ad6e7b7 commit 3587820
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
.withValueMapper(new TextMessageMapper());
```
* Coders in Python are expected to inherit from Coder. ([BEAM-14351](https://issues.apache.org/jira/browse/BEAM-14351)).
* `FailedRows` key of the errors dictionary returned by `beam.io.WriteToBigQuery` transform now returns an array of 3-element tuples `(destination_table, row, reason)` instead of `(destination_table, row)`. ([BEAM-14383](https://issues.apache.org/jira/browse/BEAM-14383)).

## Deprecations

Expand Down
8 changes: 5 additions & 3 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,8 @@ def _flush_batch(self, destination):
ignore_unknown_values=self.ignore_unknown_columns)
self.batch_latency_metric.update((time.time() - start) * 1000)

failed_rows = [rows[entry['index']] for entry in errors]
failed_rows = [(rows[entry['index']], entry["errors"])
for entry in errors]
should_retry = any(
RetryStrategy.should_retry(
self._retry_strategy, entry['errors'][0]['reason'])
Expand Down Expand Up @@ -1780,8 +1781,9 @@ def _flush_batch(self, destination):
return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination, row)))
for row in failed_rows
GlobalWindows.windowed_value((destination, row, row_errors)))
for row,
row_errors in failed_rows
]


Expand Down
91 changes: 91 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import base64
import datetime
import json
import logging
import random
import time
Expand Down Expand Up @@ -373,6 +374,96 @@ def test_big_query_write_without_schema(self):
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
temp_file_format=FileFormat.JSON))

@pytest.mark.it_postcommit
def test_big_query_write_insert_errors_reporting(self):
"""
Test that errors returned by beam.io.WriteToBigQuery
contain both the failed rows amd the reason for it failing.
"""
table_name = 'python_write_table'
table_id = '{}.{}'.format(self.dataset_id, table_name)

errors_table_name = table_name + '_error_records'
errors_table_id = '{}.{}'.format(self.dataset_id, errors_table_name)

input_data = [{
'number': 1,
'str': 'some_string',
}, {
'number': 2
},
{
'number': 3,
'str': 'some_string',
'additional_field_str': 'some_string',
}]

table_schema = {
"fields": [{
"name": "number", "type": "INTEGER", 'mode': 'REQUIRED'
}, {
"name": "str", "type": "STRING", 'mode': 'REQUIRED'
}]
}

errors_table_schema = {
"fields": [{
'name': 'table', 'type': 'STRING', 'mode': 'REQUIRED'
}, {
'name': 'reason', 'type': 'STRING', 'mode': 'NULLABLE'
}, {
'name': 'row_json', 'type': 'STRING', 'mode': 'REQUIRED'
}]
}

pipeline_verifiers = [
BigqueryFullResultMatcher(
project=self.project,
query="SELECT number, str FROM %s" % table_id,
data=[(1, 'some_string')]),
BigqueryFullResultMatcher(
project=self.project,
query="SELECT table, reason, row_json FROM %s" % errors_table_id,
data=
[(
table_id,
'[{"reason": "invalid", "location": "", "debugInfo": "", \
"message": "Missing required field: Msg_0_CLOUD_QUERY_TABLE.str."}]',
'{"number": 2}'),
(
table_id,
'[{"reason": "invalid", "location": "additional_field_str", \
"debugInfo": "", "message": "no such field: additional_field_str."}]',
'{"number": 3, "str": "some_string", "additional_field_str": \
"some_string"}')])
]

args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=hc.all_of(*pipeline_verifiers))

with beam.Pipeline(argv=args) as p:
# pylint: disable=expression-not-assigned
errors = (
p | 'create' >> beam.Create(input_data)
| 'write' >> beam.io.WriteToBigQuery(
table_id,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
(
errors["FailedRows"]
| 'ParseErrors' >> beam.Map(
lambda err: {
"table": err[0],
"reason": json.dumps(err[2]),
"row_json": json.dumps(err[1])
})
| 'WriteErrors' >> beam.io.WriteToBigQuery(
errors_table_id,
schema=errors_table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))

@pytest.mark.it_postcommit
@parameterized.expand([
param(file_format=FileFormat.AVRO),
Expand Down

0 comments on commit 3587820

Please sign in to comment.