Skip to content

Commit

Permalink
fix tests and types, enhance error message for bad partition configs
Browse files Browse the repository at this point in the history
make tests run
fix dict/PartitionConfig confusion
  • Loading branch information
Jacob Beck committed Feb 21, 2020
1 parent e29078e commit 6fb8c96
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 37 deletions.
37 changes: 22 additions & 15 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
class PartitionConfig(JsonSchemaMixin):
field: str
data_type: str = 'date'
range: Optional[Dict[str, any]] = None
range: Optional[Dict[str, Any]] = None

def render(self, alias: Optional[str] = None):
column: str = self.field
Expand All @@ -61,9 +61,10 @@ def _parse(cls, raw_partition_by) -> Optional['PartitionConfig']:
if isinstance(raw_partition_by, dict):
try:
return cls.from_dict(raw_partition_by)
except ValidationError:
except ValidationError as exc:
msg = dbt.exceptions.validator_error_message(exc)
dbt.exceptions.raise_compiler_error(
'Config `partition_by` is missing required item `field`'
f'Could not parse partition config: {msg}'
)

elif isinstance(raw_partition_by, str):
Expand Down Expand Up @@ -499,7 +500,9 @@ def execute_model(self, model, materialization, sql_override=None,

return res

def _partitions_match(self, table, conf_partition: Dict[str, Any]):
def _partitions_match(
self, table, conf_partition: Optional[PartitionConfig]
) -> bool:
"""
Check if the actual and configured partitions for a table are a match.
BigQuery tables can be replaced if:
Expand All @@ -512,24 +515,21 @@ def _partitions_match(self, table, conf_partition: Dict[str, Any]):

if not is_partitioned and not conf_partition:
return True

if table.time_partitioning is not None:
elif conf_partition and table.time_partitioning is not None:
table_field = table.time_partitioning.field
return table_field == conf_partition.get('field')

elif table.range_partitioning is not None:
return table_field == conf_partition.field
elif conf_partition and table.range_partitioning is not None:
dest_part = table.range_partition.range_
conf_part = conf_partition.get('range', {})
conf_part = conf_partition.range or {}

return dest_part.field == conf_partition.get('field') \
return dest_part.field == conf_partition.field \
and dest_part.start == conf_part.get('start') \
and dest_part.end == conf_part.get('end') \
and dest_part.interval == conf_part.get('interval')

else:
return False

def _clusters_match(self, table, conf_cluster):
def _clusters_match(self, table, conf_cluster) -> bool:
"""
Check if the actual and configured clustering columns for a table
are a match. BigQuery tables can be replaced if clustering columns
Expand All @@ -541,7 +541,12 @@ def _clusters_match(self, table, conf_cluster):
return table.clustering_fields == conf_cluster

@available.parse(lambda *a, **k: True)
def is_replaceable(self, relation, conf_partition: dict, conf_cluster):
def is_replaceable(
self,
relation,
conf_partition: Optional[PartitionConfig],
conf_cluster
) -> bool:
"""
Check if a given partition and clustering column spec for a table
can replace an existing relation in the database. BigQuery does not
Expand All @@ -567,7 +572,9 @@ def is_replaceable(self, relation, conf_partition: dict, conf_cluster):
))

@available
def parse_partition_by(self, raw_partition_by: Any):
def parse_partition_by(
self, raw_partition_by: Any
) -> Optional[PartitionConfig]:
"""
dbt v0.16.0 expects `partition_by` to be a dictionary where previously
it was a string. Check the type of `partition_by`, raise error
Expand Down
2 changes: 1 addition & 1 deletion plugins/bigquery/dbt/include/bigquery/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
{{ sql_header if sql_header is not none }}

create or replace table {{ relation }}
{{ partition_by(partition_config.to_dict()) }}
{{ partition_by(partition_config) }}
{{ cluster_by(raw_cluster_by) }}
{{ bigquery_table_options(
persist_docs=raw_persist_docs,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from test.integration.base import DBTIntegrationTest, FakeArgs, use_profile
import json


class TestChangingPartitions(DBTIntegrationTest):

@property
Expand All @@ -11,39 +12,53 @@ def schema(self):
def models(self):
return "partition-models"

def test_change(self, before, after):
results = self.run_dbt(['run', '--vars', json.dumps(before)])
def run_changes(self, before, after, strict=False):
# strict needs to be off because these tests use legacy partition_by clauses
results = self.run_dbt(['run', '--vars', json.dumps(before)], strict=strict)
self.assertEqual(len(results), 1)

results = self.run_dbt(['run', '--vars', json.dumps(after)])
results = self.run_dbt(['run', '--vars', json.dumps(after)], strict=strict)
self.assertEqual(len(results), 1)

def test_add_partition(self):
@use_profile('bigquery')
def test_bigquery_add_partition(self):
before = {"partition_by": None, "cluster_by": None}
after = {"partition_by": "date(cur_time)", "cluster_by": None}
self.test_change(before, after)
self.run_changes(before, after)

def test_remove_partition(self):
@use_profile('bigquery')
def test_bigquery_remove_partition(self):
before = {"partition_by": "date(cur_time)", "cluster_by": None}
after = {"partition_by": None, "cluster_by": None}
self.test_change(before, after)
self.run_changes(before, after)

def test_change_partitions(self):
@use_profile('bigquery')
def test_bigquery_change_partitions(self):
before = {"partition_by": "date(cur_time)", "cluster_by": None}
after = {"partition_by": "cur_date", "cluster_by": None}
self.test_change(before, after)
self.run_changes(before, after)
self.run_changes(after, before)

def test_add_clustering(self):
@use_profile('bigquery')
def test_bigquery_add_clustering(self):
before = {"partition_by": "date(cur_time)", "cluster_by": None}
after = {"partition_by": "cur_date", "cluster_by": "id"}
self.test_change(before, after)
self.run_changes(before, after)

def test_remove_clustering(self):
@use_profile('bigquery')
def test_bigquery_remove_clustering(self):
before = {"partition_by": "date(cur_time)", "cluster_by": "id"}
after = {"partition_by": "cur_date", "cluster_by": None}
self.test_change(before, after)
self.run_changes(before, after)

def test_change_clustering(self):
@use_profile('bigquery')
def test_bigquery_change_clustering(self):
before = {"partition_by": "date(cur_time)", "cluster_by": "id"}
after = {"partition_by": "cur_date", "cluster_by": "name"}
self.test_change(before, after)
self.run_changes(before, after)

@use_profile('bigquery')
def test_bigquery_change_clustering_strict(self):
before = {'partition_by': {'field': 'cur_time', 'data_type': 'timestamp'}, 'cluster_by': 'id'}
after = {'partition_by': {'field': 'cur_date', 'data_type': 'date'}, 'cluster_by': 'name'}
self.run_changes(before, after, strict=True)
10 changes: 4 additions & 6 deletions test/unit/test_bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ def test_parse_partition_by(self):
self.assertEqual(
adapter.parse_partition_by({
"field": "ts",
}.to_dict()), {
}).to_dict(), {
"field": "ts",
"data_type": "date"
}
Expand All @@ -422,11 +422,9 @@ def test_parse_partition_by(self):
}
)

# Invalid, should return None
self.assertEqual(
adapter.parse_partition_by({}),
None
)
# Invalid, should raise an error
with self.assertRaises(dbt.exceptions.CompilationException):
adapter.parse_partition_by({})

# passthrough
self.assertEqual(
Expand Down

0 comments on commit 6fb8c96

Please sign in to comment.