diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 2c21dca604784..4d7df85f905e9 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -378,6 +378,15 @@ def compute_table_name(row): NOTE: This job name template does not have backwards compatibility guarantees. """ BQ_JOB_NAME_TEMPLATE = "beam_bq_job_{job_type}_{job_id}_{step_id}{random}" +""" +The maximum number of times that a bundle of rows that errors out should be +sent for insertion into BigQuery. + +The default is 10,000 with exponential backoffs, so a bundle of rows may be +tried for a very long time. You may reduce this property to reduce the number +of retries. +""" +MAX_INSERT_RETRIES = 10000 @deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference") @@ -1492,6 +1501,7 @@ class BigQueryWriteFn(DoFn): DEFAULT_MAX_BATCH_SIZE = 500 FAILED_ROWS = 'FailedRows' + FAILED_ROWS_WITH_ERRORS = 'FailedRowsWithErrors' STREAMING_API_LOGGING_FREQUENCY_SEC = 300 def __init__( @@ -1507,7 +1517,8 @@ def __init__( additional_bq_parameters=None, ignore_insert_ids=False, with_batched_input=False, - ignore_unknown_columns=False): + ignore_unknown_columns=False, + max_retries=MAX_INSERT_RETRIES): """Initialize a WriteToBigQuery transform. Args: @@ -1555,6 +1566,9 @@ def __init__( the schema. The unknown values are ignored. Default is False, which treats unknown values as errors. See reference: https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll + max_retries: The number of times that we will retry inserting a group of + rows into BigQuery. By default, we retry 10000 times with exponential + backoffs (effectively retry forever). """ self.schema = schema @@ -1592,6 +1606,7 @@ def __init__( self.streaming_api_logging_frequency_sec = ( BigQueryWriteFn.STREAMING_API_LOGGING_FREQUENCY_SEC) self.ignore_unknown_columns = ignore_unknown_columns + self._max_retries = max_retries def display_data(self): return { @@ -1643,7 +1658,9 @@ def start_bundle(self): self._backoff_calculator = iter( retry.FuzzedExponentialIntervals( - initial_delay_secs=0.2, num_retries=10000, max_delay_secs=1500)) + initial_delay_secs=0.2, + num_retries=self._max_retries, + max_delay_secs=1500)) def _create_table_if_needed(self, table_reference, schema=None): str_table_reference = '%s:%s.%s' % ( @@ -1754,41 +1771,57 @@ def _flush_batch(self, destination): ignore_unknown_values=self.ignore_unknown_columns) self.batch_latency_metric.update((time.time() - start) * 1000) - failed_rows = [rows[entry['index']] for entry in errors] + failed_rows = [(rows[entry['index']], entry["errors"]) + for entry in errors] + retry_backoff = next(self._backoff_calculator, None) + + # If retry_backoff is None, then we will not retry and must log. should_retry = any( RetryStrategy.should_retry( self._retry_strategy, entry['errors'][0]['reason']) - for entry in errors) + for entry in errors) and retry_backoff is not None + if not passed: self.failed_rows_metric.update(len(failed_rows)) message = ( 'There were errors inserting to BigQuery. Will{} retry. ' 'Errors were {}'.format(("" if should_retry else " not"), errors)) - if should_retry: - _LOGGER.warning(message) - else: - _LOGGER.error(message) - rows = failed_rows + # The log level is: + # - WARNING when we are continuing to retry, and have a deadline. + # - ERROR when we will no longer retry, or MAY retry forever. + log_level = ( + logging.WARN if should_retry or + self._retry_strategy != RetryStrategy.RETRY_ALWAYS else + logging.ERROR) + + _LOGGER.log(log_level, message) if not should_retry: break else: - retry_backoff = next(self._backoff_calculator) _LOGGER.info( 'Sleeping %s seconds before retrying insertion.', retry_backoff) time.sleep(retry_backoff) + rows = [fr[0] for fr in failed_rows] self._throttled_secs.inc(retry_backoff) self._total_buffered_rows -= len(self._rows_buffer[destination]) del self._rows_buffer[destination] - return [ + return itertools.chain([ pvalue.TaggedOutput( - BigQueryWriteFn.FAILED_ROWS, - GlobalWindows.windowed_value((destination, row))) - for row in failed_rows - ] + BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS, + GlobalWindows.windowed_value((destination, row, err))) for row, + err in failed_rows + ], + [ + pvalue.TaggedOutput( + BigQueryWriteFn.FAILED_ROWS, + GlobalWindows.windowed_value( + (destination, row))) for row, + unused_err in failed_rows + ]) # The number of shards per destination when writing via streaming inserts. @@ -1815,7 +1848,8 @@ def __init__( ignore_insert_ids, ignore_unknown_columns, with_auto_sharding, - test_client=None): + test_client=None, + max_retries=None): self.table_reference = table_reference self.table_side_inputs = table_side_inputs self.schema_side_inputs = schema_side_inputs @@ -1831,6 +1865,7 @@ def __init__( self.ignore_insert_ids = ignore_insert_ids self.ignore_unknown_columns = ignore_unknown_columns self.with_auto_sharding = with_auto_sharding + self.max_retries = max_retries or MAX_INSERT_RETRIES class InsertIdPrefixFn(DoFn): def start_bundle(self): @@ -1856,7 +1891,8 @@ def expand(self, input): additional_bq_parameters=self.additional_bq_parameters, ignore_insert_ids=self.ignore_insert_ids, ignore_unknown_columns=self.ignore_unknown_columns, - with_batched_input=self.with_auto_sharding) + with_batched_input=self.with_auto_sharding, + max_retries=self.max_retries) def _add_random_shard(element): key = element[0] @@ -1905,7 +1941,9 @@ def _restore_table_ref(sharded_table_ref_elems_kv): | 'FromHashableTableRef' >> beam.Map(_restore_table_ref) | 'StreamInsertRows' >> ParDo( bigquery_write_fn, *self.schema_side_inputs).with_outputs( - BigQueryWriteFn.FAILED_ROWS, main='main')) + BigQueryWriteFn.FAILED_ROWS, + BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS, + main='main')) # Flag to be passed to WriteToBigQuery to force schema autodetection @@ -2194,7 +2232,11 @@ def expand(self, pcoll): with_auto_sharding=self.with_auto_sharding, test_client=self.test_client) - return {BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS]} + return { + BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS], + BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS: outputs[ + BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS], + } else: if self._temp_file_format == bigquery_tools.FileFormat.AVRO: if self.schema == SCHEMA_AUTODETECT: diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 74722e4e538cd..ff2a95c7f8ebe 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -42,6 +42,7 @@ from apache_beam.internal import pickler from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp +from apache_beam.io.gcp import bigquery as beam_bq from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.bigquery import TableRowJsonCoder from apache_beam.io.gcp.bigquery import WriteToBigQuery @@ -91,6 +92,14 @@ _LOGGER = logging.getLogger(__name__) +def _load_or_default(filename): + try: + with open(filename) as f: + return json.load(f) + except: # pylint: disable=bare-except + return {} + + @unittest.skipIf( HttpError is None or gcp_bigquery is None, 'GCP dependencies are not installed') @@ -838,6 +847,7 @@ def noop(table, **kwargs): test_client=client)) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class BigQueryStreamingInsertsErrorHandling(unittest.TestCase): # Using https://cloud.google.com/bigquery/docs/error-messages and @@ -1233,7 +1243,8 @@ def test_with_batched_input(self): @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp): - def test_failure_has_same_insert_ids(self): + @mock.patch('time.sleep') + def test_failure_has_same_insert_ids(self, unused_mock_sleep): tempdir = '%s%s' % (self._new_tempdir(), os.sep) file_name_1 = os.path.join(tempdir, 'file1') file_name_2 = os.path.join(tempdir, 'file2') @@ -1289,6 +1300,184 @@ def store_callback(table, **kwargs): with open(file_name_1) as f1, open(file_name_2) as f2: self.assertEqual(json.load(f1), json.load(f2)) + @parameterized.expand([ + param(retry_strategy=RetryStrategy.RETRY_ALWAYS), + param(retry_strategy=RetryStrategy.RETRY_NEVER), + param(retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR), + ]) + def test_failure_in_some_rows_does_not_duplicate(self, retry_strategy=None): + with mock.patch('time.sleep'): + # In this test we simulate a failure to write out two out of three rows. + # Row 0 and row 2 fail to be written on the first attempt, and then + # succeed on the next attempt (if there is one). + tempdir = '%s%s' % (self._new_tempdir(), os.sep) + file_name_1 = os.path.join(tempdir, 'file1_partial') + file_name_2 = os.path.join(tempdir, 'file2_partial') + + def store_callback(table, **kwargs): + insert_ids = [r for r in kwargs['row_ids']] + colA_values = [r['columnA'] for r in kwargs['json_rows']] + + # The first time this function is called, all rows are included + # so we need to filter out 'failed' rows. + json_output_1 = { + 'insertIds': [insert_ids[1]], 'colA_values': [colA_values[1]] + } + # The second time this function is called, only rows 0 and 2 are incl + # so we don't need to filter any of them. We just write them all out. + json_output_2 = {'insertIds': insert_ids, 'colA_values': colA_values} + + # The first time we try to insert, we save those insertions in + # file insert_calls1. + if not os.path.exists(file_name_1): + with open(file_name_1, 'w') as f: + json.dump(json_output_1, f) + return [ + { + 'index': 0, + 'errors': [{ + 'reason': 'i dont like this row' + }, { + 'reason': 'its bad' + }] + }, + { + 'index': 2, + 'errors': [{ + 'reason': 'i het this row' + }, { + 'reason': 'its no gud' + }] + }, + ] + else: + with open(file_name_2, 'w') as f: + json.dump(json_output_2, f) + return [] + + client = mock.Mock() + client.insert_rows_json = mock.Mock(side_effect=store_callback) + + # The expected rows to be inserted according to the insert strategy + if retry_strategy == RetryStrategy.RETRY_NEVER: + result = ['value3'] + else: # RETRY_ALWAYS and RETRY_ON_TRANSIENT_ERRORS should insert all rows + result = ['value1', 'value3', 'value5'] + + # Using the bundle based direct runner to avoid pickling problems + # with mocks. + with beam.Pipeline(runner='BundleBasedDirectRunner') as p: + bq_write_out = ( + p + | beam.Create([{ + 'columnA': 'value1', 'columnB': 'value2' + }, { + 'columnA': 'value3', 'columnB': 'value4' + }, { + 'columnA': 'value5', 'columnB': 'value6' + }]) + | _StreamToBigQuery( + table_reference='project:dataset.table', + table_side_inputs=[], + schema_side_inputs=[], + schema='anyschema', + batch_size=None, + triggering_frequency=None, + create_disposition='CREATE_NEVER', + write_disposition=None, + kms_key=None, + retry_strategy=retry_strategy, + additional_bq_parameters=[], + ignore_insert_ids=False, + ignore_unknown_columns=False, + with_auto_sharding=False, + test_client=client)) + + failed_values = ( + bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS] + | beam.Map(lambda x: x[1]['columnA'])) + + assert_that( + failed_values, + equal_to(list({'value1', 'value3', 'value5'}.difference(result)))) + + data1 = _load_or_default(file_name_1) + data2 = _load_or_default(file_name_2) + + self.assertListEqual( + sorted(data1.get('colA_values', []) + data2.get('colA_values', [])), + result) + self.assertEqual(len(data1['colA_values']), 1) + + @parameterized.expand([ + param(retry_strategy=RetryStrategy.RETRY_ALWAYS), + param(retry_strategy=RetryStrategy.RETRY_NEVER), + param(retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR), + ]) + def test_permanent_failure_in_some_rows_does_not_duplicate( + self, unused_sleep_mock=None, retry_strategy=None): + with mock.patch('time.sleep'): + + def store_callback(table, **kwargs): + return [ + { + 'index': 0, + 'errors': [{ + 'reason': 'invalid' + }, { + 'reason': 'its bad' + }] + }, + ] + + client = mock.Mock() + client.insert_rows_json = mock.Mock(side_effect=store_callback) + + # The expected rows to be inserted according to the insert strategy + if retry_strategy == RetryStrategy.RETRY_NEVER: + inserted_rows = ['value3', 'value5'] + else: # RETRY_ALWAYS and RETRY_ON_TRANSIENT_ERRORS should insert all rows + inserted_rows = ['value3', 'value5'] + + # Using the bundle based direct runner to avoid pickling problems + # with mocks. + with beam.Pipeline(runner='BundleBasedDirectRunner') as p: + bq_write_out = ( + p + | beam.Create([{ + 'columnA': 'value1', 'columnB': 'value2' + }, { + 'columnA': 'value3', 'columnB': 'value4' + }, { + 'columnA': 'value5', 'columnB': 'value6' + }]) + | _StreamToBigQuery( + table_reference='project:dataset.table', + table_side_inputs=[], + schema_side_inputs=[], + schema='anyschema', + batch_size=None, + triggering_frequency=None, + create_disposition='CREATE_NEVER', + write_disposition=None, + kms_key=None, + retry_strategy=retry_strategy, + additional_bq_parameters=[], + ignore_insert_ids=False, + ignore_unknown_columns=False, + with_auto_sharding=False, + test_client=client, + max_retries=10)) + + failed_values = ( + bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS] + | beam.Map(lambda x: x[1]['columnA'])) + + assert_that( + failed_values, + equal_to( + list({'value1', 'value3', 'value5'}.difference(inserted_rows)))) + @parameterized.expand([ param(with_auto_sharding=False), param(with_auto_sharding=True), @@ -1353,6 +1542,7 @@ def store_callback(table, **kwargs): self.assertEqual(out2['colA_values'], ['value5']) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase): BIG_QUERY_DATASET_ID = 'python_bq_streaming_inserts_' @@ -1538,9 +1728,15 @@ def test_multiple_destinations_transform(self): method='STREAMING_INSERTS')) assert_that( - r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS], + r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS] + | beam.Map(lambda elm: (elm[0], elm[1])), equal_to([(full_output_table_1, bad_record)])) + assert_that( + r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS], + equal_to([(full_output_table_1, bad_record)]), + label='FailedRowsMatch') + def tearDown(self): request = bigquery.BigqueryDatasetsDeleteRequest( projectId=self.project, datasetId=self.dataset_id, deleteContents=True) @@ -1646,6 +1842,7 @@ def test_file_loads(self): WriteToBigQuery.Method.FILE_LOADS, triggering_frequency=20) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class BigQueryFileLoadsIntegrationTests(unittest.TestCase): BIG_QUERY_DATASET_ID = 'python_bq_file_loads_' @@ -1676,12 +1873,12 @@ def test_avro_file_load(self): bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = 100 elements = [ { - 'name': u'Negative infinity', + 'name': 'Negative infinity', 'value': -float('inf'), 'timestamp': datetime.datetime(1970, 1, 1, tzinfo=pytz.utc), }, { - 'name': u'Not a number', + 'name': 'Not a number', 'value': float('nan'), 'timestamp': datetime.datetime(2930, 12, 9, tzinfo=pytz.utc), }, diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index e4ff6082cabbe..3ce8d0ff7de4e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -223,7 +223,7 @@ def test_delete_dataset_retries_for_timeouts(self, patched_time_sleep): self.assertTrue(client.datasets.Delete.called) @unittest.skipIf( - google and not hasattr(google.cloud, '_http'), + google and not hasattr(google.cloud, '_http'), # pylint: disable=c-extension-no-member 'Dependencies not installed') @mock.patch('time.sleep', return_value=None) @mock.patch('google.cloud._http.JSONConnection.http') diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index dd2283eb71d6c..e75b698c65161 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -36,14 +36,18 @@ from parameterized import parameterized import apache_beam as beam +from apache_beam.io.gcp.bigquery import BigQueryWriteFn from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.bigquery_tools import FileFormat from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position + try: from apitools.base.py.exceptions import HttpError except ImportError: @@ -373,6 +377,85 @@ def test_big_query_write_without_schema(self): write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, temp_file_format=FileFormat.JSON)) + @pytest.mark.it_postcommit + def test_big_query_write_insert_errors_reporting(self): + """ + Test that errors returned by beam.io.WriteToBigQuery + contain both the failed rows amd the reason for it failing. + """ + table_name = 'python_write_table' + table_id = '{}.{}'.format(self.dataset_id, table_name) + + input_data = [{ + 'number': 1, + 'str': 'some_string', + }, { + 'number': 2 + }, + { + 'number': 3, + 'str': 'some_string', + 'additional_field_str': 'some_string', + }] + + table_schema = { + "fields": [{ + "name": "number", "type": "INTEGER", 'mode': 'REQUIRED' + }, { + "name": "str", "type": "STRING", 'mode': 'REQUIRED' + }] + } + + bq_result_errors = [( + { + "number": 2 + }, + [{ + "reason": "invalid", + "location": "", + "debugInfo": "", + "message": "Missing required field: Msg_0_CLOUD_QUERY_TABLE.str." + }], + ), + ({ + "number": 3, + "str": "some_string", + "additional_field_str": "some_string" + }, + [{ + "reason": "invalid", + "location": "additional_field_str", + "debugInfo": "", + "message": "no such field: additional_field_str." + }])] + + pipeline_verifiers = [ + BigqueryFullResultMatcher( + project=self.project, + query="SELECT number, str FROM %s" % table_id, + data=[(1, 'some_string')]), + ] + + args = self.test_pipeline.get_full_options_as_args( + on_success_matcher=hc.all_of(*pipeline_verifiers)) + + with beam.Pipeline(argv=args) as p: + # pylint: disable=expression-not-assigned + errors = ( + p | 'create' >> beam.Create(input_data) + | 'write' >> beam.io.WriteToBigQuery( + table_id, + schema=table_schema, + method='STREAMING_INSERTS', + insert_retry_strategy='RETRY_NEVER', + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) + + assert_that( + errors[BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS] + | 'ParseErrors' >> beam.Map(lambda err: (err[1], err[2])), + equal_to(bq_result_errors)) + @pytest.mark.it_postcommit @parameterized.expand([ param(file_format=FileFormat.AVRO),