From 358782006e1db86437b3bf61f910db12d654b1e0 Mon Sep 17 00:00:00 2001 From: Oskar Firlej <25161992+Firlej@users.noreply.github.com> Date: Sat, 7 May 2022 00:13:57 +0200 Subject: [PATCH] Merge pull request #17517 from [BEAM-14383] Improve "FailedRows" errors returned by beam.io.WriteToBigQuery * [BEAM-14383] - add row_errors to returned rows when inserting to BigQuery * [BEAM-14383] add test checking proper insert of _error_records * [BEAM-14383] run yapf on python test and add function doc * [BEAM-14383] reorder imports * [BEAM-14383] add a breaking change note to 2.39 CHANGES.md --- CHANGES.md | 1 + sdks/python/apache_beam/io/gcp/bigquery.py | 8 +- .../io/gcp/bigquery_write_it_test.py | 91 +++++++++++++++++++ 3 files changed, 97 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 5064e86fc57d4..ad3095d9120b3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -111,6 +111,7 @@ .withValueMapper(new TextMessageMapper()); ``` * Coders in Python are expected to inherit from Coder. ([BEAM-14351](https://issues.apache.org/jira/browse/BEAM-14351)). +* `FailedRows` key of the errors dictionary returned by `beam.io.WriteToBigQuery` transform now returns an array of 3-element tuples `(destination_table, row, reason)` instead of `(destination_table, row)`. ([BEAM-14383](https://issues.apache.org/jira/browse/BEAM-14383)). ## Deprecations diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index c7bc5560bbc22..1375287b595c8 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1748,7 +1748,8 @@ def _flush_batch(self, destination): ignore_unknown_values=self.ignore_unknown_columns) self.batch_latency_metric.update((time.time() - start) * 1000) - failed_rows = [rows[entry['index']] for entry in errors] + failed_rows = [(rows[entry['index']], entry["errors"]) + for entry in errors] should_retry = any( RetryStrategy.should_retry( self._retry_strategy, entry['errors'][0]['reason']) @@ -1780,8 +1781,9 @@ def _flush_batch(self, destination): return [ pvalue.TaggedOutput( BigQueryWriteFn.FAILED_ROWS, - GlobalWindows.windowed_value((destination, row))) - for row in failed_rows + GlobalWindows.windowed_value((destination, row, row_errors))) + for row, + row_errors in failed_rows ] diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index dd2283eb71d6c..aa188e79ae969 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -22,6 +22,7 @@ import base64 import datetime +import json import logging import random import time @@ -373,6 +374,96 @@ def test_big_query_write_without_schema(self): write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, temp_file_format=FileFormat.JSON)) + @pytest.mark.it_postcommit + def test_big_query_write_insert_errors_reporting(self): + """ + Test that errors returned by beam.io.WriteToBigQuery + contain both the failed rows amd the reason for it failing. + """ + table_name = 'python_write_table' + table_id = '{}.{}'.format(self.dataset_id, table_name) + + errors_table_name = table_name + '_error_records' + errors_table_id = '{}.{}'.format(self.dataset_id, errors_table_name) + + input_data = [{ + 'number': 1, + 'str': 'some_string', + }, { + 'number': 2 + }, + { + 'number': 3, + 'str': 'some_string', + 'additional_field_str': 'some_string', + }] + + table_schema = { + "fields": [{ + "name": "number", "type": "INTEGER", 'mode': 'REQUIRED' + }, { + "name": "str", "type": "STRING", 'mode': 'REQUIRED' + }] + } + + errors_table_schema = { + "fields": [{ + 'name': 'table', 'type': 'STRING', 'mode': 'REQUIRED' + }, { + 'name': 'reason', 'type': 'STRING', 'mode': 'NULLABLE' + }, { + 'name': 'row_json', 'type': 'STRING', 'mode': 'REQUIRED' + }] + } + + pipeline_verifiers = [ + BigqueryFullResultMatcher( + project=self.project, + query="SELECT number, str FROM %s" % table_id, + data=[(1, 'some_string')]), + BigqueryFullResultMatcher( + project=self.project, + query="SELECT table, reason, row_json FROM %s" % errors_table_id, + data= + [( + table_id, + '[{"reason": "invalid", "location": "", "debugInfo": "", \ +"message": "Missing required field: Msg_0_CLOUD_QUERY_TABLE.str."}]', + '{"number": 2}'), + ( + table_id, + '[{"reason": "invalid", "location": "additional_field_str", \ +"debugInfo": "", "message": "no such field: additional_field_str."}]', + '{"number": 3, "str": "some_string", "additional_field_str": \ +"some_string"}')]) + ] + + args = self.test_pipeline.get_full_options_as_args( + on_success_matcher=hc.all_of(*pipeline_verifiers)) + + with beam.Pipeline(argv=args) as p: + # pylint: disable=expression-not-assigned + errors = ( + p | 'create' >> beam.Create(input_data) + | 'write' >> beam.io.WriteToBigQuery( + table_id, + schema=table_schema, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY)) + ( + errors["FailedRows"] + | 'ParseErrors' >> beam.Map( + lambda err: { + "table": err[0], + "reason": json.dumps(err[2]), + "row_json": json.dumps(err[1]) + }) + | 'WriteErrors' >> beam.io.WriteToBigQuery( + errors_table_id, + schema=errors_table_schema, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY)) + @pytest.mark.it_postcommit @parameterized.expand([ param(file_format=FileFormat.AVRO),