diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index a4fd5053df00e..11bfb562b1325 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -42,6 +42,7 @@ from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp from apache_beam.io.gcp import bigquery_tools +from apache_beam.io.gcp.bigquery import ReadFromBigQuery from apache_beam.io.gcp.bigquery import TableRowJsonCoder from apache_beam.io.gcp.bigquery import WriteToBigQuery from apache_beam.io.gcp.bigquery import _StreamToBigQuery @@ -52,6 +53,7 @@ from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.bigquery_tools import RetryStrategy from apache_beam.io.gcp.internal.clients import bigquery +from apache_beam.io.gcp.internal.clients.bigquery import bigquery_v2_client from apache_beam.io.gcp.pubsub import ReadFromPubSub from apache_beam.io.gcp.tests import utils from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher @@ -78,9 +80,11 @@ try: from apitools.base.py.exceptions import HttpError from google.cloud import bigquery as gcp_bigquery + from google.api_core import exceptions except ImportError: gcp_bigquery = None HttpError = None + exceptions = None # pylint: enable=wrong-import-order, wrong-import-position _LOGGER = logging.getLogger(__name__) @@ -482,6 +486,89 @@ def test_temp_dataset_is_configurable( delete_table.assert_called_with( temp_dataset.projectId, temp_dataset.datasetId, mock.ANY) + @parameterized.expand([ + param(exception_type=exceptions.Forbidden, error_message='accessDenied'), + param( + exception_type=exceptions.ServiceUnavailable, + error_message='backendError'), + ]) + def test_create_temp_dataset_exception(self, exception_type, error_message): + + with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService, + 'Insert'),\ + mock.patch.object(BigQueryWrapper, + 'get_or_create_dataset') as mock_insert, \ + mock.patch('time.sleep'), \ + self.assertRaises(Exception) as exc,\ + beam.Pipeline() as p: + + mock_insert.side_effect = exception_type(error_message) + + _ = p | ReadFromBigQuery( + project='apache-beam-testing', + query='SELECT * FROM `project.dataset.table`', + gcs_location='gs://temp_location') + + mock_insert.assert_called() + self.assertIn(error_message, exc.exception.args[0]) + + @parameterized.expand([ + param(exception_type=exceptions.BadRequest, error_message='invalidQuery'), + param(exception_type=exceptions.NotFound, error_message='notFound'), + param( + exception_type=exceptions.Forbidden, error_message='responseTooLarge') + ]) + def test_query_job_exception(self, exception_type, error_message): + + with mock.patch.object(beam.io.gcp.bigquery._CustomBigQuerySource, + 'estimate_size') as mock_estimate,\ + mock.patch.object(BigQueryWrapper, + 'get_query_location') as mock_query_location,\ + mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService, + 'Insert') as mock_query_job,\ + mock.patch('time.sleep'), \ + self.assertRaises(Exception) as exc, \ + beam.Pipeline() as p: + + mock_estimate.return_value = None + mock_query_location.return_value = None + mock_query_job.side_effect = exception_type(error_message) + + _ = p | ReadFromBigQuery( + project='apache-beam-testing', + query='SELECT * FROM `project.dataset.table`', + gcs_location='gs://temp_location') + + mock_query_job.assert_called() + self.assertIn(error_message, exc.exception.args[0]) + + @parameterized.expand([ + param(exception_type=exceptions.BadRequest, error_message='invalid'), + param(exception_type=exceptions.Forbidden, error_message='accessDenied') + ]) + def test_read_export_exception(self, exception_type, error_message): + + with mock.patch.object(beam.io.gcp.bigquery._CustomBigQuerySource, + 'estimate_size') as mock_estimate,\ + mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService, 'Get'),\ + mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService, + 'Insert') as mock_query_job, \ + mock.patch('time.sleep'), \ + self.assertRaises(Exception) as exc,\ + beam.Pipeline() as p: + + mock_estimate.return_value = None + mock_query_job.side_effect = exception_type(error_message) + + _ = p | ReadFromBigQuery( + project='apache-beam-testing', + method=ReadFromBigQuery.Method.EXPORT, + table='project:dataset.table', + gcs_location="gs://temp_location") + + mock_query_job.assert_called() + self.assertIn(error_message, exc.exception.args[0]) + @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestBigQuerySink(unittest.TestCase): @@ -832,6 +919,110 @@ def noop(table, **kwargs): with_auto_sharding=True, test_client=client)) + @parameterized.expand([ + param(exception_type=exceptions.Forbidden, error_message='accessDenied'), + param( + exception_type=exceptions.ServiceUnavailable, + error_message='backendError') + ]) + def test_load_job_exception(self, exception_type, error_message): + + with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService, + 'Insert') as mock_load_job,\ + mock.patch('apache_beam.io.gcp.internal.clients' + '.storage.storage_v1_client.StorageV1.ObjectsService'),\ + mock.patch('time.sleep') as unused_mock,\ + self.assertRaises(Exception) as exc,\ + beam.Pipeline() as p: + + mock_load_job.side_effect = exception_type(error_message) + + _ = ( + p + | beam.Create([{ + 'columnA': 'value1' + }]) + | WriteToBigQuery( + table='project:dataset.table', + schema={ + 'fields': [{ + 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE' + }] + }, + create_disposition='CREATE_NEVER', + custom_gcs_temp_location="gs://temp_location", + method='FILE_LOADS')) + + mock_load_job.assert_called() + self.assertIn(error_message, exc.exception.args[0]) + + @parameterized.expand([ + param( + exception_type=exceptions.ServiceUnavailable, + error_message='backendError'), + param( + exception_type=exceptions.InternalServerError, + error_message='internalError'), + ]) + def test_copy_load_job_exception(self, exception_type, error_message): + + from apache_beam.io.gcp import bigquery_file_loads + + old_max_file_size = bigquery_file_loads._DEFAULT_MAX_FILE_SIZE + old_max_partition_size = bigquery_file_loads._MAXIMUM_LOAD_SIZE + old_max_files_per_partition = bigquery_file_loads._MAXIMUM_SOURCE_URIS + bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = 15 + bigquery_file_loads._MAXIMUM_LOAD_SIZE = 30 + bigquery_file_loads._MAXIMUM_SOURCE_URIS = 1 + + with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService, + 'Insert') as mock_insert_copy_job, \ + mock.patch.object(BigQueryWrapper, + 'perform_load_job') as mock_load_job, \ + mock.patch.object(BigQueryWrapper, + 'wait_for_bq_job'), \ + mock.patch('apache_beam.io.gcp.internal.clients' + '.storage.storage_v1_client.StorageV1.ObjectsService'), \ + mock.patch('time.sleep'), \ + self.assertRaises(Exception) as exc, \ + beam.Pipeline() as p: + + mock_insert_copy_job.side_effect = exception_type(error_message) + + dummy_job_reference = beam.io.gcp.internal.clients.bigquery.JobReference() + dummy_job_reference.jobId = 'job_id' + dummy_job_reference.location = 'US' + dummy_job_reference.projectId = 'apache-beam-testing' + + mock_load_job.return_value = dummy_job_reference + + _ = ( + p + | beam.Create([{ + 'columnA': 'value1' + }, { + 'columnA': 'value2' + }, { + 'columnA': 'value3' + }]) + | WriteToBigQuery( + table='project:dataset.table', + schema={ + 'fields': [{ + 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE' + }] + }, + create_disposition='CREATE_NEVER', + custom_gcs_temp_location="gs://temp_location", + method='FILE_LOADS')) + + bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = old_max_file_size + bigquery_file_loads._MAXIMUM_LOAD_SIZE = old_max_partition_size + bigquery_file_loads._MAXIMUM_SOURCE_URIS = old_max_files_per_partition + + self.assertEqual(4, mock_insert_copy_job.call_count) + self.assertIn(error_message, exc.exception.args[0]) + @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class BigQueryStreamingInsertTransformTests(unittest.TestCase):