Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14422] Adding exception testing for ReadFromBigQuery and WriteToBigQuery #17589

Merged
merged 21 commits into from
Jun 10, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 191 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
from apache_beam.io.gcp.bigquery import TableRowJsonCoder
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.io.gcp.bigquery import _StreamToBigQuery
Expand All @@ -52,6 +53,7 @@
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
from apache_beam.io.gcp.bigquery_tools import RetryStrategy
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.gcp.internal.clients.bigquery import bigquery_v2_client
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.tests import utils
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
Expand All @@ -78,9 +80,11 @@
try:
from apitools.base.py.exceptions import HttpError
from google.cloud import bigquery as gcp_bigquery
from google.api_core import exceptions
except ImportError:
gcp_bigquery = None
HttpError = None
exceptions = None
# pylint: enable=wrong-import-order, wrong-import-position

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -482,6 +486,89 @@ def test_temp_dataset_is_configurable(
delete_table.assert_called_with(
temp_dataset.projectId, temp_dataset.datasetId, mock.ANY)

@parameterized.expand([
param(exception_type=exceptions.Forbidden, error_message='accessDenied'),
param(
exception_type=exceptions.ServiceUnavailable,
error_message='backendError'),
])
def test_create_temp_dataset_exception(self, exception_type, error_message):

with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
'Insert'),\
mock.patch.object(BigQueryWrapper,
'get_or_create_dataset') as mock_insert, \
mock.patch('time.sleep'), \
self.assertRaises(Exception) as exc,\
beam.Pipeline() as p:

mock_insert.side_effect = exception_type(error_message)

_ = p | ReadFromBigQuery(
project='apache-beam-testing',
query='SELECT * FROM `project.dataset.table`',
gcs_location='gs://temp_location')

mock_insert.assert_called()
self.assertIn(error_message, exc.exception.args[0])

@parameterized.expand([
param(exception_type=exceptions.BadRequest, error_message='invalidQuery'),
param(exception_type=exceptions.NotFound, error_message='notFound'),
param(
exception_type=exceptions.Forbidden, error_message='responseTooLarge')
])
def test_query_job_exception(self, exception_type, error_message):

with mock.patch.object(beam.io.gcp.bigquery._CustomBigQuerySource,
'estimate_size') as mock_estimate,\
mock.patch.object(BigQueryWrapper,
'get_query_location') as mock_query_location,\
mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
'Insert') as mock_query_job,\
mock.patch('time.sleep'), \
self.assertRaises(Exception) as exc, \
beam.Pipeline() as p:

mock_estimate.return_value = None
mock_query_location.return_value = None
mock_query_job.side_effect = exception_type(error_message)

_ = p | ReadFromBigQuery(
project='apache-beam-testing',
query='SELECT * FROM `project.dataset.table`',
gcs_location='gs://temp_location')

mock_query_job.assert_called()
self.assertIn(error_message, exc.exception.args[0])

@parameterized.expand([
param(exception_type=exceptions.BadRequest, error_message='invalid'),
param(exception_type=exceptions.Forbidden, error_message='accessDenied')
])
def test_read_export_exception(self, exception_type, error_message):

with mock.patch.object(beam.io.gcp.bigquery._CustomBigQuerySource,
'estimate_size') as mock_estimate,\
mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService, 'Get'),\
mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
'Insert') as mock_query_job, \
mock.patch('time.sleep'), \
self.assertRaises(Exception) as exc,\
beam.Pipeline() as p:

mock_estimate.return_value = None
mock_query_job.side_effect = exception_type(error_message)

_ = p | ReadFromBigQuery(
project='apache-beam-testing',
method=ReadFromBigQuery.Method.EXPORT,
table='project:dataset.table',
gcs_location="gs://temp_location")

mock_query_job.assert_called()
self.assertIn(error_message, exc.exception.args[0])


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBigQuerySink(unittest.TestCase):
Expand Down Expand Up @@ -832,6 +919,110 @@ def noop(table, **kwargs):
with_auto_sharding=True,
test_client=client))

@parameterized.expand([
param(exception_type=exceptions.Forbidden, error_message='accessDenied'),
param(
exception_type=exceptions.ServiceUnavailable,
error_message='backendError')
])
def test_load_job_exception(self, exception_type, error_message):

with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
'Insert') as mock_load_job,\
mock.patch('apache_beam.io.gcp.internal.clients'
'.storage.storage_v1_client.StorageV1.ObjectsService'),\
mock.patch('time.sleep') as unused_mock,\
self.assertRaises(Exception) as exc,\
beam.Pipeline() as p:

mock_load_job.side_effect = exception_type(error_message)

_ = (
p
| beam.Create([{
'columnA': 'value1'
}])
| WriteToBigQuery(
table='project:dataset.table',
schema={
'fields': [{
'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
}]
},
create_disposition='CREATE_NEVER',
custom_gcs_temp_location="gs://temp_location",
method='FILE_LOADS'))

mock_load_job.assert_called()
self.assertIn(error_message, exc.exception.args[0])

@parameterized.expand([
param(
exception_type=exceptions.ServiceUnavailable,
error_message='backendError'),
param(
exception_type=exceptions.InternalServerError,
error_message='internalError'),
])
def test_copy_load_job_exception(self, exception_type, error_message):

from apache_beam.io.gcp import bigquery_file_loads

old_max_file_size = bigquery_file_loads._DEFAULT_MAX_FILE_SIZE
old_max_partition_size = bigquery_file_loads._MAXIMUM_LOAD_SIZE
old_max_files_per_partition = bigquery_file_loads._MAXIMUM_SOURCE_URIS
bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = 15
bigquery_file_loads._MAXIMUM_LOAD_SIZE = 30
bigquery_file_loads._MAXIMUM_SOURCE_URIS = 1

with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
'Insert') as mock_insert_copy_job, \
mock.patch.object(BigQueryWrapper,
'perform_load_job') as mock_load_job, \
mock.patch.object(BigQueryWrapper,
'wait_for_bq_job'), \
mock.patch('apache_beam.io.gcp.internal.clients'
'.storage.storage_v1_client.StorageV1.ObjectsService'), \
mock.patch('time.sleep'), \
self.assertRaises(Exception) as exc, \
beam.Pipeline() as p:

mock_insert_copy_job.side_effect = exception_type(error_message)

dummy_job_reference = beam.io.gcp.internal.clients.bigquery.JobReference()
dummy_job_reference.jobId = 'job_id'
dummy_job_reference.location = 'US'
dummy_job_reference.projectId = 'apache-beam-testing'

mock_load_job.return_value = dummy_job_reference

_ = (
p
| beam.Create([{
'columnA': 'value1'
}, {
'columnA': 'value2'
}, {
'columnA': 'value3'
}])
| WriteToBigQuery(
table='project:dataset.table',
schema={
'fields': [{
'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
}]
},
create_disposition='CREATE_NEVER',
custom_gcs_temp_location="gs://temp_location",
method='FILE_LOADS'))

bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = old_max_file_size
bigquery_file_loads._MAXIMUM_LOAD_SIZE = old_max_partition_size
bigquery_file_loads._MAXIMUM_SOURCE_URIS = old_max_files_per_partition

self.assertEqual(4, mock_insert_copy_job.call_count)
self.assertIn(error_message, exc.exception.args[0])


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class BigQueryStreamingInsertTransformTests(unittest.TestCase):
Expand Down