Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make it easier to disable best-effort deduplication with streaming inserts #734

Merged
merged 2 commits into from
Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery import enums
from google.cloud.bigquery.enums import AutoRowIDs
from google.cloud.bigquery.enums import KeyResultStatementKind
from google.cloud.bigquery.enums import SqlTypeNames
from google.cloud.bigquery.enums import StandardSqlDataTypes
Expand Down Expand Up @@ -144,6 +145,7 @@
"DEFAULT_RETRY",
# Enum Constants
"enums",
"AutoRowIDs",
"Compression",
"CreateDisposition",
"DestinationFormat",
Expand Down
47 changes: 41 additions & 6 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetListItem
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.enums import AutoRowIDs
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
from google.cloud.bigquery.opentelemetry_tracing import create_span
from google.cloud.bigquery import job
Expand Down Expand Up @@ -3349,7 +3350,7 @@ def insert_rows_json(
self,
table: Union[Table, TableReference, str],
json_rows: Sequence[Dict],
row_ids: Sequence[str] = None,
row_ids: Union[Iterable[str], AutoRowIDs, None] = AutoRowIDs.GENERATE_UUID,
skip_invalid_rows: bool = None,
ignore_unknown_values: bool = None,
template_suffix: str = None,
Expand All @@ -3371,11 +3372,20 @@ def insert_rows_json(
json_rows (Sequence[Dict]):
Row data to be inserted. Keys must match the table schema fields
and values must be JSON-compatible representations.
row_ids (Optional[Sequence[Optional[str]]]):
row_ids (Union[Iterable[str], AutoRowIDs, None]):
Unique IDs, one per row being inserted. An ID can also be
``None``, indicating that an explicit insert ID should **not**
be used for that row. If the argument is omitted altogether,
unique IDs are created automatically.

.. versionchanged:: 2.21.0
Can also be an iterable, not just a sequence, or an
:class:`AutoRowIDs` enum member.

.. deprecated:: 2.21.0
Passing ``None`` to explicitly request autogenerating insert IDs is
deprecated, use :attr:`AutoRowIDs.GENERATE_UUID` instead.

skip_invalid_rows (Optional[bool]):
Insert all valid rows of a request, even if invalid rows exist.
The default value is ``False``, which causes the entire request
Expand Down Expand Up @@ -3415,12 +3425,37 @@ def insert_rows_json(
rows_info = []
data = {"rows": rows_info}

for index, row in enumerate(json_rows):
if row_ids is None:
warnings.warn(
"Passing None for row_ids is deprecated. To explicitly request "
"autogenerated insert IDs, use AutoRowIDs.GENERATE_UUID instead",
category=DeprecationWarning,
)
row_ids = AutoRowIDs.GENERATE_UUID

if not isinstance(row_ids, AutoRowIDs):
try:
row_ids_iter = iter(row_ids)
except TypeError:
msg = "row_ids is neither an iterable nor an AutoRowIDs enum member"
raise TypeError(msg)

for i, row in enumerate(json_rows):
info = {"json": row}
if row_ids is not None:
info["insertId"] = row_ids[index]
else:

if row_ids is AutoRowIDs.GENERATE_UUID:
info["insertId"] = str(uuid.uuid4())
elif row_ids is AutoRowIDs.DISABLED:
info["insertId"] = None
else:
try:
insert_id = next(row_ids_iter)
except StopIteration:
msg = f"row_ids did not generate enough IDs, error at index {i}"
raise ValueError(msg)
else:
info["insertId"] = insert_id

rows_info.append(info)

if skip_invalid_rows is not None:
Expand Down
7 changes: 7 additions & 0 deletions google/cloud/bigquery/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
from google.cloud.bigquery.query import ScalarQueryParameterType


class AutoRowIDs(enum.Enum):
"""How to handle automatic insert IDs when inserting rows as a stream."""

DISABLED = enum.auto()
GENERATE_UUID = enum.auto()


class Compression(object):
"""The compression type to use for exported files. The default value is
:attr:`NONE`.
Expand Down
153 changes: 145 additions & 8 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5434,7 +5434,7 @@ def test_insert_rows_from_dataframe_w_explicit_none_insert_ids(self):
method="POST", path=API_PATH, data=EXPECTED_SENT_DATA, timeout=None
)

def test_insert_rows_json(self):
def test_insert_rows_json_default_behavior(self):
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import Table
Expand Down Expand Up @@ -5481,29 +5481,127 @@ def test_insert_rows_json(self):
method="POST", path="/%s" % PATH, data=SENT, timeout=7.5,
)

def test_insert_rows_json_with_string_id(self):
rows = [{"col1": "val1"}]
def test_insert_rows_json_w_explicitly_requested_autogenerated_insert_ids(self):
from google.cloud.bigquery import AutoRowIDs

rows = [{"col1": "val1"}, {"col2": "val2"}]
creds = _make_credentials()
http = object()
client = self._make_one(
project="default-project", credentials=creds, _http=http
)
conn = client._connection = make_connection({})

with mock.patch("uuid.uuid4", side_effect=map(str, range(len(rows)))):
errors = client.insert_rows_json("proj.dset.tbl", rows)
uuid_patcher = mock.patch("uuid.uuid4", side_effect=map(str, range(len(rows))))
with uuid_patcher:
errors = client.insert_rows_json(
"proj.dset.tbl", rows, row_ids=AutoRowIDs.GENERATE_UUID
)

self.assertEqual(len(errors), 0)
expected = {
"rows": [{"json": row, "insertId": str(i)} for i, row in enumerate(rows)]

# Check row data sent to the backend.
expected_row_data = {
"rows": [
{"json": {"col1": "val1"}, "insertId": "0"},
{"json": {"col2": "val2"}, "insertId": "1"},
]
}
conn.api_request.assert_called_once_with(
method="POST",
path="/projects/proj/datasets/dset/tables/tbl/insertAll",
data=expected,
data=expected_row_data,
timeout=None,
)

def test_insert_rows_json_w_explicitly_disabled_insert_ids(self):
from google.cloud.bigquery import AutoRowIDs

rows = [{"col1": "val1"}, {"col2": "val2"}]
creds = _make_credentials()
http = object()
client = self._make_one(
project="default-project", credentials=creds, _http=http
)
conn = client._connection = make_connection({})

errors = client.insert_rows_json(
"proj.dset.tbl", rows, row_ids=AutoRowIDs.DISABLED,
)

self.assertEqual(len(errors), 0)

expected_row_data = {
"rows": [
{"json": {"col1": "val1"}, "insertId": None},
{"json": {"col2": "val2"}, "insertId": None},
]
}
conn.api_request.assert_called_once_with(
method="POST",
path="/projects/proj/datasets/dset/tables/tbl/insertAll",
data=expected_row_data,
timeout=None,
)

def test_insert_rows_json_with_iterator_row_ids(self):
rows = [{"col1": "val1"}, {"col2": "val2"}, {"col3": "val3"}]
creds = _make_credentials()
http = object()
client = self._make_one(
project="default-project", credentials=creds, _http=http
)
conn = client._connection = make_connection({})

row_ids_iter = map(str, itertools.count(42))
errors = client.insert_rows_json("proj.dset.tbl", rows, row_ids=row_ids_iter)

self.assertEqual(len(errors), 0)
expected_row_data = {
"rows": [
{"json": {"col1": "val1"}, "insertId": "42"},
{"json": {"col2": "val2"}, "insertId": "43"},
{"json": {"col3": "val3"}, "insertId": "44"},
]
}
conn.api_request.assert_called_once_with(
method="POST",
path="/projects/proj/datasets/dset/tables/tbl/insertAll",
data=expected_row_data,
timeout=None,
)

def test_insert_rows_json_with_non_iterable_row_ids(self):
rows = [{"col1": "val1"}]
creds = _make_credentials()
http = object()
client = self._make_one(
project="default-project", credentials=creds, _http=http
)
client._connection = make_connection({})

with self.assertRaises(TypeError) as exc:
client.insert_rows_json("proj.dset.tbl", rows, row_ids=object())

err_msg = str(exc.exception)
self.assertIn("row_ids", err_msg)
self.assertIn("iterable", err_msg)

def test_insert_rows_json_with_too_few_row_ids(self):
rows = [{"col1": "val1"}, {"col2": "val2"}, {"col3": "val3"}]
creds = _make_credentials()
http = object()
client = self._make_one(
project="default-project", credentials=creds, _http=http
)
client._connection = make_connection({})

insert_ids = ["10", "20"]

error_msg_pattern = "row_ids did not generate enough IDs.*index 2"
with self.assertRaisesRegex(ValueError, error_msg_pattern):
client.insert_rows_json("proj.dset.tbl", rows, row_ids=insert_ids)

def test_insert_rows_json_w_explicit_none_insert_ids(self):
rows = [{"col1": "val1"}, {"col2": "val2"}]
creds = _make_credentials()
Expand All @@ -5526,6 +5624,45 @@ def test_insert_rows_json_w_explicit_none_insert_ids(self):
timeout=None,
)

def test_insert_rows_json_w_none_insert_ids_sequence(self):
rows = [{"col1": "val1"}, {"col2": "val2"}]
creds = _make_credentials()
http = object()
client = self._make_one(
project="default-project", credentials=creds, _http=http
)
conn = client._connection = make_connection({})

uuid_patcher = mock.patch("uuid.uuid4", side_effect=map(str, range(len(rows))))
with warnings.catch_warnings(record=True) as warned, uuid_patcher:
errors = client.insert_rows_json("proj.dset.tbl", rows, row_ids=None)

self.assertEqual(len(errors), 0)

# Passing row_ids=None should have resulted in a deprecation warning.
matches = [
warning
for warning in warned
if issubclass(warning.category, DeprecationWarning)
and "row_ids" in str(warning)
and "AutoRowIDs.GENERATE_UUID" in str(warning)
]
assert matches, "The expected deprecation warning was not raised."

# Check row data sent to the backend.
expected_row_data = {
"rows": [
{"json": {"col1": "val1"}, "insertId": "0"},
{"json": {"col2": "val2"}, "insertId": "1"},
]
}
conn.api_request.assert_called_once_with(
method="POST",
path="/projects/proj/datasets/dset/tables/tbl/insertAll",
data=expected_row_data,
timeout=None,
)

def test_insert_rows_w_wrong_arg(self):
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.schema import SchemaField
Expand Down