Skip to content

Commit

Permalink
Merge pull request apache#17584 from [BEAM-14415] Exception handling …
Browse files Browse the repository at this point in the history
…tests and logging for partial failure BQIO

* [BEAM-14415] Exception handling tests and logging for partial failures in BQ IO

* fix DLQ integration test

* fix lint

* fix postcommit

* fix formatter

* Fixing tests and adding test info

* fix skipping tests
  • Loading branch information
pabloem authored May 12, 2022
1 parent 3cca763 commit 6680261
Show file tree
Hide file tree
Showing 4 changed files with 346 additions and 24 deletions.
80 changes: 61 additions & 19 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,15 @@ def compute_table_name(row):
NOTE: This job name template does not have backwards compatibility guarantees.
"""
BQ_JOB_NAME_TEMPLATE = "beam_bq_job_{job_type}_{job_id}_{step_id}{random}"
"""
The maximum number of times that a bundle of rows that errors out should be
sent for insertion into BigQuery.
The default is 10,000 with exponential backoffs, so a bundle of rows may be
tried for a very long time. You may reduce this property to reduce the number
of retries.
"""
MAX_INSERT_RETRIES = 10000


@deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference")
Expand Down Expand Up @@ -1492,6 +1501,7 @@ class BigQueryWriteFn(DoFn):
DEFAULT_MAX_BATCH_SIZE = 500

FAILED_ROWS = 'FailedRows'
FAILED_ROWS_WITH_ERRORS = 'FailedRowsWithErrors'
STREAMING_API_LOGGING_FREQUENCY_SEC = 300

def __init__(
Expand All @@ -1507,7 +1517,8 @@ def __init__(
additional_bq_parameters=None,
ignore_insert_ids=False,
with_batched_input=False,
ignore_unknown_columns=False):
ignore_unknown_columns=False,
max_retries=MAX_INSERT_RETRIES):
"""Initialize a WriteToBigQuery transform.
Args:
Expand Down Expand Up @@ -1555,6 +1566,9 @@ def __init__(
the schema. The unknown values are ignored. Default is False,
which treats unknown values as errors. See reference:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
max_retries: The number of times that we will retry inserting a group of
rows into BigQuery. By default, we retry 10000 times with exponential
backoffs (effectively retry forever).
"""
self.schema = schema
Expand Down Expand Up @@ -1592,6 +1606,7 @@ def __init__(
self.streaming_api_logging_frequency_sec = (
BigQueryWriteFn.STREAMING_API_LOGGING_FREQUENCY_SEC)
self.ignore_unknown_columns = ignore_unknown_columns
self._max_retries = max_retries

def display_data(self):
return {
Expand Down Expand Up @@ -1643,7 +1658,9 @@ def start_bundle(self):

self._backoff_calculator = iter(
retry.FuzzedExponentialIntervals(
initial_delay_secs=0.2, num_retries=10000, max_delay_secs=1500))
initial_delay_secs=0.2,
num_retries=self._max_retries,
max_delay_secs=1500))

def _create_table_if_needed(self, table_reference, schema=None):
str_table_reference = '%s:%s.%s' % (
Expand Down Expand Up @@ -1754,41 +1771,57 @@ def _flush_batch(self, destination):
ignore_unknown_values=self.ignore_unknown_columns)
self.batch_latency_metric.update((time.time() - start) * 1000)

failed_rows = [rows[entry['index']] for entry in errors]
failed_rows = [(rows[entry['index']], entry["errors"])
for entry in errors]
retry_backoff = next(self._backoff_calculator, None)

# If retry_backoff is None, then we will not retry and must log.
should_retry = any(
RetryStrategy.should_retry(
self._retry_strategy, entry['errors'][0]['reason'])
for entry in errors)
for entry in errors) and retry_backoff is not None

if not passed:
self.failed_rows_metric.update(len(failed_rows))
message = (
'There were errors inserting to BigQuery. Will{} retry. '
'Errors were {}'.format(("" if should_retry else " not"), errors))
if should_retry:
_LOGGER.warning(message)
else:
_LOGGER.error(message)

rows = failed_rows
# The log level is:
# - WARNING when we are continuing to retry, and have a deadline.
# - ERROR when we will no longer retry, or MAY retry forever.
log_level = (
logging.WARN if should_retry or
self._retry_strategy != RetryStrategy.RETRY_ALWAYS else
logging.ERROR)

_LOGGER.log(log_level, message)

if not should_retry:
break
else:
retry_backoff = next(self._backoff_calculator)
_LOGGER.info(
'Sleeping %s seconds before retrying insertion.', retry_backoff)
time.sleep(retry_backoff)
rows = [fr[0] for fr in failed_rows]
self._throttled_secs.inc(retry_backoff)

self._total_buffered_rows -= len(self._rows_buffer[destination])
del self._rows_buffer[destination]

return [
return itertools.chain([
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination, row)))
for row in failed_rows
]
BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
GlobalWindows.windowed_value((destination, row, err))) for row,
err in failed_rows
],
[
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value(
(destination, row))) for row,
unused_err in failed_rows
])


# The number of shards per destination when writing via streaming inserts.
Expand All @@ -1815,7 +1848,8 @@ def __init__(
ignore_insert_ids,
ignore_unknown_columns,
with_auto_sharding,
test_client=None):
test_client=None,
max_retries=None):
self.table_reference = table_reference
self.table_side_inputs = table_side_inputs
self.schema_side_inputs = schema_side_inputs
Expand All @@ -1831,6 +1865,7 @@ def __init__(
self.ignore_insert_ids = ignore_insert_ids
self.ignore_unknown_columns = ignore_unknown_columns
self.with_auto_sharding = with_auto_sharding
self.max_retries = max_retries or MAX_INSERT_RETRIES

class InsertIdPrefixFn(DoFn):
def start_bundle(self):
Expand All @@ -1856,7 +1891,8 @@ def expand(self, input):
additional_bq_parameters=self.additional_bq_parameters,
ignore_insert_ids=self.ignore_insert_ids,
ignore_unknown_columns=self.ignore_unknown_columns,
with_batched_input=self.with_auto_sharding)
with_batched_input=self.with_auto_sharding,
max_retries=self.max_retries)

def _add_random_shard(element):
key = element[0]
Expand Down Expand Up @@ -1905,7 +1941,9 @@ def _restore_table_ref(sharded_table_ref_elems_kv):
| 'FromHashableTableRef' >> beam.Map(_restore_table_ref)
| 'StreamInsertRows' >> ParDo(
bigquery_write_fn, *self.schema_side_inputs).with_outputs(
BigQueryWriteFn.FAILED_ROWS, main='main'))
BigQueryWriteFn.FAILED_ROWS,
BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
main='main'))


# Flag to be passed to WriteToBigQuery to force schema autodetection
Expand Down Expand Up @@ -2194,7 +2232,11 @@ def expand(self, pcoll):
with_auto_sharding=self.with_auto_sharding,
test_client=self.test_client)

return {BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS]}
return {
BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS],
BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS: outputs[
BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS],
}
else:
if self._temp_file_format == bigquery_tools.FileFormat.AVRO:
if self.schema == SCHEMA_AUTODETECT:
Expand Down
Loading

0 comments on commit 6680261

Please sign in to comment.