From abf488ee880cfc40505d041add1c1afc141bbc36 Mon Sep 17 00:00:00 2001 From: Pablo E Date: Tue, 10 May 2022 09:49:28 -0700 Subject: [PATCH] Revert "Merge pull request #17517 from [BEAM-14383] Improve "FailedRows" errors returned by beam.io.WriteToBigQuery" This reverts commit 358782006e1db86437b3bf61f910db12d654b1e0. --- CHANGES.md | 1 - sdks/python/apache_beam/io/gcp/bigquery.py | 8 +- .../io/gcp/bigquery_write_it_test.py | 91 ------------------- 3 files changed, 3 insertions(+), 97 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ad3095d9120b3..5064e86fc57d4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -111,7 +111,6 @@ .withValueMapper(new TextMessageMapper()); ``` * Coders in Python are expected to inherit from Coder. ([BEAM-14351](https://issues.apache.org/jira/browse/BEAM-14351)). -* `FailedRows` key of the errors dictionary returned by `beam.io.WriteToBigQuery` transform now returns an array of 3-element tuples `(destination_table, row, reason)` instead of `(destination_table, row)`. ([BEAM-14383](https://issues.apache.org/jira/browse/BEAM-14383)). ## Deprecations diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 79f6081e79660..2c21dca604784 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1754,8 +1754,7 @@ def _flush_batch(self, destination): ignore_unknown_values=self.ignore_unknown_columns) self.batch_latency_metric.update((time.time() - start) * 1000) - failed_rows = [(rows[entry['index']], entry["errors"]) - for entry in errors] + failed_rows = [rows[entry['index']] for entry in errors] should_retry = any( RetryStrategy.should_retry( self._retry_strategy, entry['errors'][0]['reason']) @@ -1787,9 +1786,8 @@ def _flush_batch(self, destination): return [ pvalue.TaggedOutput( BigQueryWriteFn.FAILED_ROWS, - GlobalWindows.windowed_value((destination, row, row_errors))) - for row, - row_errors in failed_rows + GlobalWindows.windowed_value((destination, row))) + for row in failed_rows ] diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index aa188e79ae969..dd2283eb71d6c 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -22,7 +22,6 @@ import base64 import datetime -import json import logging import random import time @@ -374,96 +373,6 @@ def test_big_query_write_without_schema(self): write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, temp_file_format=FileFormat.JSON)) - @pytest.mark.it_postcommit - def test_big_query_write_insert_errors_reporting(self): - """ - Test that errors returned by beam.io.WriteToBigQuery - contain both the failed rows amd the reason for it failing. - """ - table_name = 'python_write_table' - table_id = '{}.{}'.format(self.dataset_id, table_name) - - errors_table_name = table_name + '_error_records' - errors_table_id = '{}.{}'.format(self.dataset_id, errors_table_name) - - input_data = [{ - 'number': 1, - 'str': 'some_string', - }, { - 'number': 2 - }, - { - 'number': 3, - 'str': 'some_string', - 'additional_field_str': 'some_string', - }] - - table_schema = { - "fields": [{ - "name": "number", "type": "INTEGER", 'mode': 'REQUIRED' - }, { - "name": "str", "type": "STRING", 'mode': 'REQUIRED' - }] - } - - errors_table_schema = { - "fields": [{ - 'name': 'table', 'type': 'STRING', 'mode': 'REQUIRED' - }, { - 'name': 'reason', 'type': 'STRING', 'mode': 'NULLABLE' - }, { - 'name': 'row_json', 'type': 'STRING', 'mode': 'REQUIRED' - }] - } - - pipeline_verifiers = [ - BigqueryFullResultMatcher( - project=self.project, - query="SELECT number, str FROM %s" % table_id, - data=[(1, 'some_string')]), - BigqueryFullResultMatcher( - project=self.project, - query="SELECT table, reason, row_json FROM %s" % errors_table_id, - data= - [( - table_id, - '[{"reason": "invalid", "location": "", "debugInfo": "", \ -"message": "Missing required field: Msg_0_CLOUD_QUERY_TABLE.str."}]', - '{"number": 2}'), - ( - table_id, - '[{"reason": "invalid", "location": "additional_field_str", \ -"debugInfo": "", "message": "no such field: additional_field_str."}]', - '{"number": 3, "str": "some_string", "additional_field_str": \ -"some_string"}')]) - ] - - args = self.test_pipeline.get_full_options_as_args( - on_success_matcher=hc.all_of(*pipeline_verifiers)) - - with beam.Pipeline(argv=args) as p: - # pylint: disable=expression-not-assigned - errors = ( - p | 'create' >> beam.Create(input_data) - | 'write' >> beam.io.WriteToBigQuery( - table_id, - schema=table_schema, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY)) - ( - errors["FailedRows"] - | 'ParseErrors' >> beam.Map( - lambda err: { - "table": err[0], - "reason": json.dumps(err[2]), - "row_json": json.dumps(err[1]) - }) - | 'WriteErrors' >> beam.io.WriteToBigQuery( - errors_table_id, - schema=errors_table_schema, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY)) - @pytest.mark.it_postcommit @parameterized.expand([ param(file_format=FileFormat.AVRO),