diff --git a/core/dbt/deprecations.py b/core/dbt/deprecations.py index 1996d62c145..3cfeea14491 100644 --- a/core/dbt/deprecations.py +++ b/core/dbt/deprecations.py @@ -108,6 +108,26 @@ class ModelsKeyNonModelDeprecation(DBTDeprecation): ''' +class BigQueryPartitionByStringDeprecation(DBTDeprecation): + _name = 'bq-partition-by-string' + + _description = ''' + As of dbt v0.16.0, the `partition_by` config in BigQuery accepts a + dictionary containing `field` and `data_type`. + + + - Provided partition_by: {raw_partition_by} + + + - dbt inferred: {inferred_partition_by} + + + + For more information, see: + https://docs.getdbt.com/docs/upgrading-to-0-16-0 + ''' + + _adapter_renamed_description = """\ The adapter function `adapter.{old_name}` is deprecated and will be removed in a future release of dbt. Please use `adapter.{new_name}` instead. @@ -151,6 +171,7 @@ def warn(name, *args, **kwargs): NotADictionaryDeprecation(), ColumnQuotingDeprecation(), ModelsKeyNonModelDeprecation(), + BigQueryPartitionByStringDeprecation(), ] deprecations: Dict[str, DBTDeprecation] = { diff --git a/core/dbt/include/global_project/macros/materializations/common/merge.sql b/core/dbt/include/global_project/macros/materializations/common/merge.sql index 4c0c36bdb9c..b02be5b8b1e 100644 --- a/core/dbt/include/global_project/macros/materializations/common/merge.sql +++ b/core/dbt/include/global_project/macros/materializations/common/merge.sql @@ -1,37 +1,32 @@ -{% macro get_merge_sql(target, source, unique_key, dest_columns) -%} - {{ adapter_macro('get_merge_sql', target, source, unique_key, dest_columns) }} +{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates=none) -%} + {{ adapter_macro('get_merge_sql', target, source, unique_key, dest_columns, predicates) }} {%- endmacro %} + {% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%} {{ adapter_macro('get_delete_insert_merge_sql', target, source, unique_key, dest_columns) }} {%- endmacro %} -{% macro get_quoted_csv(column_names) %} - {% set quoted = [] %} - {% for col in column_names -%} - {%- do quoted.append(adapter.quote(col)) -%} - {%- endfor %} - - {%- set dest_cols_csv = quoted | join(', ') -%} - {{ return(dest_cols_csv) }} -{% endmacro %} - - -{% macro common_get_merge_sql(target, source, unique_key, dest_columns) -%} - {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} - - merge into {{ target }} as DBT_INTERNAL_DEST - using {{ source }} as DBT_INTERNAL_SOURCE +{% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%} + {%- set predicates = [] if predicates is none else [] + predicates -%} + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} {% if unique_key %} - on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {% set unique_key_match %} + DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {% endset %} + {% do predicates.append(unique_key_match) %} {% else %} - on FALSE + {% do predicates.append('FALSE') %} {% endif %} + merge into {{ target }} as DBT_INTERNAL_DEST + using {{ source }} as DBT_INTERNAL_SOURCE + on {{ predicates | join(' and ') }} + {% if unique_key %} when matched then update set {% for column in dest_columns -%} @@ -45,16 +40,17 @@ values ({{ dest_cols_csv }}) -{%- endmacro %} +{% endmacro %} -{% macro default__get_merge_sql(target, source, unique_key, dest_columns) -%} - {% set typename = adapter.type() %} - {{ exceptions.raise_compiler_error( - 'get_merge_sql is not implemented for {}'.format(typename) - ) - }} +{% macro get_quoted_csv(column_names) %} + {% set quoted = [] %} + {% for col in column_names -%} + {%- do quoted.append(adapter.quote(col)) -%} + {%- endfor %} + {%- set dest_cols_csv = quoted | join(', ') -%} + {{ return(dest_cols_csv) }} {% endmacro %} diff --git a/core/dbt/include/global_project/macros/materializations/snapshot/snapshot.sql b/core/dbt/include/global_project/macros/materializations/snapshot/snapshot.sql index 5022a15e334..a0166931f34 100644 --- a/core/dbt/include/global_project/macros/materializations/snapshot/snapshot.sql +++ b/core/dbt/include/global_project/macros/materializations/snapshot/snapshot.sql @@ -170,7 +170,7 @@ insert into {{ tmp_relation }} (dbt_change_type, dbt_scd_id, dbt_valid_to) select dbt_change_type, dbt_scd_id, dbt_valid_to from ( {{ updates_select }} - ) dbt_sbq; + ) dbt_sbq {% endcall %} {% do return(tmp_relation) %} diff --git a/core/dbt/links.py b/core/dbt/links.py index f5c017011f7..c934942e391 100644 --- a/core/dbt/links.py +++ b/core/dbt/links.py @@ -1,3 +1,4 @@ ProfileConfigDocs = 'https://docs.getdbt.com/docs/configure-your-profile' SnowflakeQuotingDocs = 'https://docs.getdbt.com/v0.10/docs/configuring-quoting' IncrementalDocs = 'https://docs.getdbt.com/docs/configuring-incremental-models' +BigQueryNewPartitionBy = 'https://docs.getdbt.com/docs/upgrading-to-0-16-0' diff --git a/core/dbt/utils.py b/core/dbt/utils.py index 100f11a32f1..48e562338cf 100644 --- a/core/dbt/utils.py +++ b/core/dbt/utils.py @@ -480,3 +480,12 @@ def __init__(self, func): def __get__(self, obj, objtype): return self.func(objtype) + + +def format_bytes(num_bytes): + for unit in ['Bytes', 'KB', 'MB', 'GB', 'TB']: + if abs(num_bytes) < 1024.0: + return f"{num_bytes:3.1f} {unit}" + num_bytes /= 1024.0 + + return "> 1024 TB" diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index 82fd74ead75..98b0d839763 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -8,6 +8,7 @@ from google.api_core import retry, client_info from google.oauth2 import service_account +from dbt.utils import format_bytes from dbt.clients import agate_helper, gcloud from dbt.exceptions import ( FailedToConnectException, RuntimeException, DatabaseException @@ -66,13 +67,9 @@ class BigQueryConnectionManager(BaseConnectionManager): DEFAULT_MAXIMUM_DELAY = 1.0 # Seconds @classmethod - def handle_error(cls, error, message, sql): - logger.debug(message.format(sql=sql)) - logger.debug(str(error)) - error_msg = "\n".join( - [item['message'] for item in error.errors]) - - raise DatabaseException(error_msg) from error + def handle_error(cls, error, message): + error_msg = "\n".join([item['message'] for item in error.errors]) + raise DatabaseException(error_msg) def clear_transaction(self): pass @@ -83,12 +80,12 @@ def exception_handler(self, sql): yield except google.cloud.exceptions.BadRequest as e: - message = "Bad request while running:\n{sql}" - self.handle_error(e, message, sql) + message = "Bad request while running query" + self.handle_error(e, message) except google.cloud.exceptions.Forbidden as e: - message = "Access denied while running:\n{sql}" - self.handle_error(e, message, sql) + message = "Access denied while running query" + self.handle_error(e, message) except Exception as e: logger.debug("Unhandled error while running:\n{}".format(sql)) @@ -98,7 +95,7 @@ def exception_handler(self, sql): # this sounds a lot like a signal handler and probably has # useful information, so raise it without modification. raise - raise RuntimeException(str(e)) from e + raise RuntimeException(str(e)) def cancel_open(self) -> None: pass @@ -237,6 +234,10 @@ def execute(self, sql, auto_begin=False, fetch=None): table = client.get_table(query_job.destination) status = 'CREATE TABLE ({})'.format(table.num_rows) + elif query_job.statement_type == 'SCRIPT': + processed = format_bytes(query_job.total_bytes_processed) + status = f'SCRIPT ({processed} processed)' + elif query_job.statement_type in ['INSERT', 'DELETE', 'MERGE']: status = '{} ({})'.format( query_job.statement_type, @@ -372,9 +373,6 @@ def count_error(self, error): self.error_count, self.retries, repr(error)) return True else: - logger.debug( - 'Not Retrying after {} previous attempts. Error: {}', - self.error_count - 1, repr(error)) return False diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index 1e58c73a5ad..10a8380ca67 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -1,10 +1,13 @@ +from dataclasses import dataclass from typing import Dict, List, Optional, Any, Set +from hologram import JsonSchemaMixin, ValidationError import dbt.deprecations import dbt.exceptions import dbt.flags as flags import dbt.clients.gcloud import dbt.clients.agate_helper +import dbt.links from dbt.adapters.base import BaseAdapter, available, RelationType from dbt.adapters.base.impl import SchemaSearchMap @@ -26,6 +29,90 @@ import time import agate +import re + + +BQ_INTEGER_RANGE_NOT_SUPPORTED = f""" +BigQuery integer range partitioning is only supported by the +`partition_by` config, which accepts a dictionary. + +See: {dbt.links.BigQueryNewPartitionBy} +""" + + +@dataclass +class PartitionConfig(JsonSchemaMixin): + field: str + data_type: str = 'date' + range: Optional[Dict[str, Any]] = None + + def render(self, alias: Optional[str] = None): + column: str = self.field + if alias: + column = f'{alias}.{self.field}' + + if self.data_type in ('timestamp', 'datetime'): + return f'date({column})' + else: + return column + + @classmethod + def _parse(cls, raw_partition_by) -> Optional['PartitionConfig']: + if isinstance(raw_partition_by, dict): + try: + return cls.from_dict(raw_partition_by) + except ValidationError as exc: + msg = dbt.exceptions.validator_error_message(exc) + dbt.exceptions.raise_compiler_error( + f'Could not parse partition config: {msg}' + ) + + elif isinstance(raw_partition_by, str): + raw_partition_by = raw_partition_by.strip() + if 'range_bucket' in raw_partition_by.lower(): + dbt.exceptions.raise_compiler_error( + BQ_INTEGER_RANGE_NOT_SUPPORTED + ) + + elif raw_partition_by.lower().startswith('date('): + matches = re.match(r'date\((.+)\)', raw_partition_by, + re.IGNORECASE) + if not matches: + dbt.exceptions.raise_compiler_error( + f"Specified partition_by '{raw_partition_by}' " + "is not parseable") + + partition_by = matches.group(1) + data_type = 'timestamp' + + else: + partition_by = raw_partition_by + data_type = 'date' + + inferred_partition_by = cls( + field=partition_by, + data_type=data_type + ) + + dbt.deprecations.warn( + 'bq-partition-by-string', + raw_partition_by=raw_partition_by, + inferred_partition_by=inferred_partition_by.to_dict() + ) + return inferred_partition_by + else: + return None + + @classmethod + def parse(cls, raw_partition_by) -> Optional['PartitionConfig']: + try: + return cls._parse(raw_partition_by) + except TypeError: + dbt.exceptions.raise_compiler_error( + f'Invalid partition_by config:\n' + f' Got: {raw_partition_by}\n' + f' Expected a dictionary with "field" and "data_type" keys' + ) def _stub_relation(*args, **kwargs): @@ -393,7 +480,7 @@ def execute_model(self, model, materialization, sql_override=None, if flags.STRICT_MODE: connection = self.connections.get_thread_connection() if not isinstance(connection, Connection): - raise dbt.exceptions.CompilerException( + dbt.exceptions.raise_compiler_error( f'Got {connection} - not a Connection!' ) model_uid = model.get('unique_id') @@ -413,8 +500,53 @@ def execute_model(self, model, materialization, sql_override=None, return res + def _partitions_match( + self, table, conf_partition: Optional[PartitionConfig] + ) -> bool: + """ + Check if the actual and configured partitions for a table are a match. + BigQuery tables can be replaced if: + - Both tables are not partitioned, OR + - Both tables are partitioned using the exact same configs + + If there is a mismatch, then the table cannot be replaced directly. + """ + is_partitioned = (table.range_partitioning or table.time_partitioning) + + if not is_partitioned and not conf_partition: + return True + elif conf_partition and table.time_partitioning is not None: + table_field = table.time_partitioning.field + return table_field == conf_partition.field + elif conf_partition and table.range_partitioning is not None: + dest_part = table.range_partition.range_ + conf_part = conf_partition.range or {} + + return dest_part.field == conf_partition.field \ + and dest_part.start == conf_part.get('start') \ + and dest_part.end == conf_part.get('end') \ + and dest_part.interval == conf_part.get('interval') + else: + return False + + def _clusters_match(self, table, conf_cluster) -> bool: + """ + Check if the actual and configured clustering columns for a table + are a match. BigQuery tables can be replaced if clustering columns + match exactly. + """ + if isinstance(conf_cluster, str): + conf_cluster = [conf_cluster] + + return table.clustering_fields == conf_cluster + @available.parse(lambda *a, **k: True) - def is_replaceable(self, relation, conf_partition, conf_cluster): + def is_replaceable( + self, + relation, + conf_partition: Optional[PartitionConfig], + conf_cluster + ) -> bool: """ Check if a given partition and clustering column spec for a table can replace an existing relation in the database. BigQuery does not @@ -422,6 +554,9 @@ def is_replaceable(self, relation, conf_partition, conf_cluster): partitioning spec. This method returns True if the given config spec is identical to that of the existing table. """ + if not relation: + return True + try: table = self.connections.get_bq_table( database=relation.database, @@ -431,17 +566,21 @@ def is_replaceable(self, relation, conf_partition, conf_cluster): except google.cloud.exceptions.NotFound: return True - table_partition = table.time_partitioning - if table_partition is not None: - table_partition = table_partition.field - - table_cluster = table.clustering_fields + return all(( + self._partitions_match(table, conf_partition), + self._clusters_match(table, conf_cluster) + )) - if isinstance(conf_cluster, str): - conf_cluster = [conf_cluster] - - return table_partition == conf_partition \ - and table_cluster == conf_cluster + @available + def parse_partition_by( + self, raw_partition_by: Any + ) -> Optional[PartitionConfig]: + """ + dbt v0.16.0 expects `partition_by` to be a dictionary where previously + it was a string. Check the type of `partition_by`, raise error + or warning if string, and attempt to convert to dict. + """ + return PartitionConfig.parse(raw_partition_by) @available.parse_none def alter_table_add_columns(self, relation, columns): diff --git a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql index d67b3f47a08..a7530d43b2e 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql @@ -1,25 +1,26 @@ -{% macro partition_by(raw_partition_by) %} - {%- if raw_partition_by is none -%} - {{ return('') }} - {% endif %} - - {% set partition_by_clause %} - partition by {{ raw_partition_by }} - {%- endset -%} - {{ return(partition_by_clause) }} +{% macro partition_by(partition_config) -%} + {%- if partition_config is none -%} + {% do return('') %} + {%- elif partition_config.data_type | lower in ('date','timestamp','datetime') -%} + partition by {{ partition_config.render() }} + {%- elif partition_config.data_type | lower in ('int64') -%} + {%- set range = partition_config.range -%} + partition by range_bucket( + {{ partition_config.field }}, + generate_array({{ range.start}}, {{ range.end }}, {{ range.interval }}) + ) + {%- endif -%} {%- endmacro -%} - {% macro cluster_by(raw_cluster_by) %} {%- if raw_cluster_by is not none -%} - cluster by - {% if raw_cluster_by is string -%} + cluster by {% if raw_cluster_by is string -%} {% set raw_cluster_by = [raw_cluster_by] %} {%- endif -%} {%- for cluster in raw_cluster_by -%} {{ cluster }} - {%- if not loop.last -%},{%- endif -%} + {%- if not loop.last -%}, {% endif -%} {%- endfor -%} {% endif %} @@ -63,19 +64,24 @@ {%- set raw_labels = config.get('labels', []) -%} {%- set sql_header = config.get('sql_header', none) -%} + {%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%} + {{ sql_header if sql_header is not none }} create or replace table {{ relation }} - {{ partition_by(raw_partition_by) }} + {{ partition_by(partition_config) }} {{ cluster_by(raw_cluster_by) }} {{ bigquery_table_options( - persist_docs=raw_persist_docs, temporary=temporary, kms_key_name=raw_kms_key_name, - labels=raw_labels) }} + persist_docs=raw_persist_docs, + temporary=temporary, + kms_key_name=raw_kms_key_name, + labels=raw_labels + ) }} as ( {{ sql }} ); -{%- endmacro -%} +{%- endmacro -%} {% macro bigquery__create_view_as(relation, sql) -%} {%- set raw_persist_docs = config.get('persist_docs', {}) -%} diff --git a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql index d2e62e4f8e6..66e1c48c624 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql @@ -1,4 +1,46 @@ +{% macro bq_partition_merge(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns) %} + {%- set partition_type = + 'date' if partition_by.data_type in ('timestamp, datetime') + else partition_by.data_type -%} + + {% set predicate -%} + {{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_upsert) + {%- endset %} + + {%- set source_sql -%} + ( + select * from {{ tmp_relation }} + ) + {%- endset -%} + + -- generated script to merge partitions into {{ target_relation }} + declare dbt_partitions_for_upsert array<{{ partition_type }}>; + declare _dbt_max_partition {{ partition_by.data_type }}; + + set _dbt_max_partition = ( + select max({{ partition_by.field }}) from {{ this }} + ); + + -- 1. create a temp table + {{ create_table_as(True, tmp_relation, sql) }} + + -- 2. define partitions to update + set (dbt_partitions_for_upsert) = ( + select as struct + array_agg(distinct {{ partition_by.render() }}) + from {{ tmp_relation }} + ); + + -- 3. run the merge statement + {{ get_merge_sql(target_relation, source_sql, unique_key, dest_columns, [predicate]) }}; + + -- 4. clean up the temp table + drop table if exists {{ tmp_relation }} + +{% endmacro %} + + {% materialization incremental, adapter='bigquery' -%} {%- set unique_key = config.get('unique_key') -%} @@ -8,7 +50,8 @@ {%- set existing_relation = load_relation(this) %} {%- set tmp_relation = make_temp_relation(this) %} - {%- set partition_by = config.get('partition_by', none) -%} + {%- set raw_partition_by = config.get('partition_by', none) -%} + {%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%} {%- set cluster_by = config.get('cluster_by', none) -%} {{ run_hooks(pre_hooks) }} @@ -29,13 +72,28 @@ {% else %} {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} - {#-- wrap sql in parens to make it a subquery --#} - {% set source_sql -%} - ( - {{ sql }} - ) - {%- endset -%} - {% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns) %} + {#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#} + {% if partition_by is not none %} + {% set build_sql = bq_partition_merge( + tmp_relation, + target_relation, + sql, + unique_key, + partition_by, + dest_columns) %} + + {% else %} + {#-- wrap sql in parens to make it a subquery --#} + {%- set source_sql -%} + ( + {{sql}} + ) + {%- endset -%} + + {% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns) %} + + {% endif %} + {% endif %} {%- call statement('main') -%} diff --git a/plugins/bigquery/dbt/include/bigquery/macros/materializations/merge.sql b/plugins/bigquery/dbt/include/bigquery/macros/materializations/merge.sql deleted file mode 100644 index 8e8f42a3563..00000000000 --- a/plugins/bigquery/dbt/include/bigquery/macros/materializations/merge.sql +++ /dev/null @@ -1,3 +0,0 @@ -{% macro bigquery__get_merge_sql(target, source, unique_key, dest_columns) %} - {{ common_get_merge_sql(target, source, unique_key, dest_columns) }} -{% endmacro %} diff --git a/plugins/bigquery/dbt/include/bigquery/macros/materializations/table.sql b/plugins/bigquery/dbt/include/bigquery/macros/materializations/table.sql index 8ac3956ffee..ea9b7e4d39c 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/materializations/table.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/materializations/table.sql @@ -63,6 +63,13 @@ {%- set should_create = (old_relation is none or exists_not_as_table) -%} {{ make_date_partitioned_table(model, target_relation, partitions, should_create, verbose) }} {% else %} + {%- set raw_partition_by = config.get('partition_by', none) -%} + {%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%} + {%- set cluster_by = config.get('cluster_by', none) -%} + {% if not adapter.is_replaceable(old_relation, partition_by, cluster_by) %} + {% do log("Hard refreshing " ~ old_relation ~ " because it is not replaceable") %} + {% do adapter.drop_relation(old_relation) %} + {% endif %} {% call statement('main') -%} {{ create_table_as(False, target_relation, sql) }} {% endcall -%} diff --git a/plugins/snowflake/dbt/include/snowflake/macros/materializations/merge.sql b/plugins/snowflake/dbt/include/snowflake/macros/materializations/merge.sql index 12e894f8c01..e3a5d5cd085 100644 --- a/plugins/snowflake/dbt/include/snowflake/macros/materializations/merge.sql +++ b/plugins/snowflake/dbt/include/snowflake/macros/materializations/merge.sql @@ -1,4 +1,4 @@ -{% macro snowflake__get_merge_sql(target, source_sql, unique_key, dest_columns) -%} +{% macro snowflake__get_merge_sql(target, source_sql, unique_key, dest_columns, predicates) -%} {# Workaround for Snowflake not being happy with a merge on a constant-false predicate. @@ -18,7 +18,7 @@ {%- else -%} - {{ common_get_merge_sql(target, source_sql, unique_key, dest_columns) }} + {{ default__get_merge_sql(target, source_sql, unique_key, dest_columns, predicates) }} {%- endif -%} diff --git a/test/integration/012_deprecation_tests/bq-partitioned-models/clustered_model.sql b/test/integration/012_deprecation_tests/bq-partitioned-models/clustered_model.sql new file mode 100644 index 00000000000..e2caada291a --- /dev/null +++ b/test/integration/012_deprecation_tests/bq-partitioned-models/clustered_model.sql @@ -0,0 +1,16 @@ + +{{ + config( + materialized = "table", + partition_by = "updated_at_date", + cluster_by = "dupe", + ) +}} + +select + + id, + current_date as updated_at_date, + dupe + +from {{ ref('data_seed') }} diff --git a/test/integration/012_deprecation_tests/bq-partitioned-models/multi_clustered_model.sql b/test/integration/012_deprecation_tests/bq-partitioned-models/multi_clustered_model.sql new file mode 100644 index 00000000000..5cbd012a272 --- /dev/null +++ b/test/integration/012_deprecation_tests/bq-partitioned-models/multi_clustered_model.sql @@ -0,0 +1,16 @@ + +{{ + config( + materialized = "table", + partition_by = "updated_at_date", + cluster_by = ["dupe","id"], + ) +}} + +select + + id, + current_date as updated_at_date, + dupe + +from {{ ref('data_seed') }} diff --git a/test/integration/012_deprecation_tests/bq-partitioned-models/partitioned_model.sql b/test/integration/012_deprecation_tests/bq-partitioned-models/partitioned_model.sql new file mode 100644 index 00000000000..12087fac76a --- /dev/null +++ b/test/integration/012_deprecation_tests/bq-partitioned-models/partitioned_model.sql @@ -0,0 +1,15 @@ + +{{ + config( + materialized = "table", + partition_by = "updated_at_date", + ) +}} + +select + + id, + current_date as updated_at_date, + dupe + +from {{ ref('data_seed') }} diff --git a/test/integration/012_deprecation_tests/bq-partitioned-models/partitioned_ts_model.sql b/test/integration/012_deprecation_tests/bq-partitioned-models/partitioned_ts_model.sql new file mode 100644 index 00000000000..5696b231b9a --- /dev/null +++ b/test/integration/012_deprecation_tests/bq-partitioned-models/partitioned_ts_model.sql @@ -0,0 +1,15 @@ + +{{ + config( + materialized = "table", + partition_by = "date(updated_at_ts)", + ) +}} + +select + + id, + current_timestamp as updated_at_ts, + dupe + +from {{ ref('data_seed') }} diff --git a/test/integration/012_deprecation_tests/data/data_seed.csv b/test/integration/012_deprecation_tests/data/data_seed.csv new file mode 100644 index 00000000000..afd0a31efa5 --- /dev/null +++ b/test/integration/012_deprecation_tests/data/data_seed.csv @@ -0,0 +1,5 @@ +id,dupe +1,a +2,a +3,a +4,a diff --git a/test/integration/012_deprecation_tests/test_deprecations.py b/test/integration/012_deprecation_tests/test_deprecations.py index 8e0231461ad..b6d21c9cc45 100644 --- a/test/integration/012_deprecation_tests/test_deprecations.py +++ b/test/integration/012_deprecation_tests/test_deprecations.py @@ -77,4 +77,22 @@ def test_postgres_deprecations(self): self.assertEqual(deprecations.active_deprecations, set()) self.run_dbt(strict=False) expected = {'models-key-mismatch'} + + +class TestBQPartitionByDeprecation(BaseTestDeprecations): + @property + def models(self): + return self.dir('bq-partitioned-models') + + @use_profile('bigquery') + def test_bigquery_partition_by_fail(self): + self.run_dbt(['seed']) + self.run_dbt(strict=True, expect_pass=False) + + @use_profile('bigquery') + def test_bigquery_partition_by(self): + self.run_dbt(['seed']) + self.assertEqual(deprecations.active_deprecations, set()) + self.run_dbt(strict=False) + expected = {'bq-partition-by-string'} self.assertEqual(expected, deprecations.active_deprecations) diff --git a/test/integration/022_bigquery_test/models/clustered_model.sql b/test/integration/022_bigquery_test/models/clustered_model.sql index 22c8b314e58..1e0987cccf1 100644 --- a/test/integration/022_bigquery_test/models/clustered_model.sql +++ b/test/integration/022_bigquery_test/models/clustered_model.sql @@ -2,7 +2,7 @@ {{ config( materialized = "table", - partition_by = "updated_at", + partition_by = {"field": "updated_at", "data_type": "date"}, cluster_by = "dupe", ) }} diff --git a/test/integration/022_bigquery_test/models/multi_clustered_model.sql b/test/integration/022_bigquery_test/models/multi_clustered_model.sql index 10724aeb68b..c2093d6d6b9 100644 --- a/test/integration/022_bigquery_test/models/multi_clustered_model.sql +++ b/test/integration/022_bigquery_test/models/multi_clustered_model.sql @@ -2,7 +2,7 @@ {{ config( materialized = "table", - partition_by = "updated_at", + partition_by = {"field": "updated_at", "data_type": "date"}, cluster_by = ["dupe","id"], ) }} diff --git a/test/integration/022_bigquery_test/models/partitioned_model.sql b/test/integration/022_bigquery_test/models/partitioned_model.sql index 9c24dae9fad..0f30a2185a9 100644 --- a/test/integration/022_bigquery_test/models/partitioned_model.sql +++ b/test/integration/022_bigquery_test/models/partitioned_model.sql @@ -2,7 +2,7 @@ {{ config( materialized = "table", - partition_by = "updated_at", + partition_by = {'field': 'updated_at', 'data_type': 'date'}, ) }} diff --git a/test/integration/022_bigquery_test/partition-models/my_model.sql b/test/integration/022_bigquery_test/partition-models/my_model.sql new file mode 100644 index 00000000000..610daf0b196 --- /dev/null +++ b/test/integration/022_bigquery_test/partition-models/my_model.sql @@ -0,0 +1,11 @@ + + +{{ + config( + materialized="table", + partition_by=var('partition_by'), + cluster_by=var('cluster_by') + ) +}} + +select 1 as id, 'dr. bigquery' as name, current_timestamp() as cur_time, current_date() as cur_date diff --git a/test/integration/022_bigquery_test/scripting-models/incremental_range.sql b/test/integration/022_bigquery_test/scripting-models/incremental_range.sql new file mode 100644 index 00000000000..4c1725832ba --- /dev/null +++ b/test/integration/022_bigquery_test/scripting-models/incremental_range.sql @@ -0,0 +1,39 @@ + +{{ + config( + materialized="incremental", + unique_key="id", + cluster_by="id", + partition_by={ + "field": "id", + "data_type": "int64", + "range": { + "start": 1, + "end": 10, + "interval": 1 + } + } + ) +}} + + +with data as ( + select 1 as id, current_date() as ts union all + select 2 as id, current_date() as ts union all + select 3 as id, current_date() as ts union all + select 4 as id, current_date() as ts + + {% if is_incremental() %} + union all + select 5 as id, current_date() as ts union all + select 6 as id, current_date() as ts union all + select 7 as id, current_date() as ts union all + select 8 as id, current_date() as ts + {% endif %} +) + +select * from data + +{% if is_incremental() %} +where id > _dbt_max_partition +{% endif %} diff --git a/test/integration/022_bigquery_test/scripting-models/incremental_time.sql b/test/integration/022_bigquery_test/scripting-models/incremental_time.sql new file mode 100644 index 00000000000..fe6f7d313ed --- /dev/null +++ b/test/integration/022_bigquery_test/scripting-models/incremental_time.sql @@ -0,0 +1,34 @@ + +{{ + config( + materialized="incremental", + unique_key="id", + cluster_by="id", + partition_by={ + "field": "ts", + "data_type": "timestamp" + } + ) +}} + + +with data as ( + select 1 as id, current_timestamp() as ts union all + select 2 as id, current_timestamp() as ts union all + select 3 as id, current_timestamp() as ts union all + select 4 as id, current_timestamp() as ts + + {% if is_incremental() %} + union all + select 5 as id, current_timestamp() as ts union all + select 6 as id, current_timestamp() as ts union all + select 7 as id, current_timestamp() as ts union all + select 8 as id, current_timestamp() as ts + {% endif %} +) + +select * from data + +{% if is_incremental() %} +where ts > _dbt_max_partition +{% endif %} diff --git a/test/integration/022_bigquery_test/test_bigquery_changing_partitions.py b/test/integration/022_bigquery_test/test_bigquery_changing_partitions.py new file mode 100644 index 00000000000..93a0bd60b8e --- /dev/null +++ b/test/integration/022_bigquery_test/test_bigquery_changing_partitions.py @@ -0,0 +1,64 @@ +from test.integration.base import DBTIntegrationTest, FakeArgs, use_profile +import json + + +class TestChangingPartitions(DBTIntegrationTest): + + @property + def schema(self): + return "bigquery_test_022" + + @property + def models(self): + return "partition-models" + + def run_changes(self, before, after, strict=False): + # strict needs to be off because these tests use legacy partition_by clauses + results = self.run_dbt(['run', '--vars', json.dumps(before)], strict=strict) + self.assertEqual(len(results), 1) + + results = self.run_dbt(['run', '--vars', json.dumps(after)], strict=strict) + self.assertEqual(len(results), 1) + + @use_profile('bigquery') + def test_bigquery_add_partition(self): + before = {"partition_by": None, "cluster_by": None} + after = {"partition_by": "date(cur_time)", "cluster_by": None} + self.run_changes(before, after) + + @use_profile('bigquery') + def test_bigquery_remove_partition(self): + before = {"partition_by": "date(cur_time)", "cluster_by": None} + after = {"partition_by": None, "cluster_by": None} + self.run_changes(before, after) + + @use_profile('bigquery') + def test_bigquery_change_partitions(self): + before = {"partition_by": "date(cur_time)", "cluster_by": None} + after = {"partition_by": "cur_date", "cluster_by": None} + self.run_changes(before, after) + self.run_changes(after, before) + + @use_profile('bigquery') + def test_bigquery_add_clustering(self): + before = {"partition_by": "date(cur_time)", "cluster_by": None} + after = {"partition_by": "cur_date", "cluster_by": "id"} + self.run_changes(before, after) + + @use_profile('bigquery') + def test_bigquery_remove_clustering(self): + before = {"partition_by": "date(cur_time)", "cluster_by": "id"} + after = {"partition_by": "cur_date", "cluster_by": None} + self.run_changes(before, after) + + @use_profile('bigquery') + def test_bigquery_change_clustering(self): + before = {"partition_by": "date(cur_time)", "cluster_by": "id"} + after = {"partition_by": "cur_date", "cluster_by": "name"} + self.run_changes(before, after) + + @use_profile('bigquery') + def test_bigquery_change_clustering_strict(self): + before = {'partition_by': {'field': 'cur_time', 'data_type': 'timestamp'}, 'cluster_by': 'id'} + after = {'partition_by': {'field': 'cur_date', 'data_type': 'date'}, 'cluster_by': 'name'} + self.run_changes(before, after, strict=True) diff --git a/test/integration/022_bigquery_test/test_scripting.py b/test/integration/022_bigquery_test/test_scripting.py new file mode 100644 index 00000000000..7199f634485 --- /dev/null +++ b/test/integration/022_bigquery_test/test_scripting.py @@ -0,0 +1,22 @@ +from test.integration.base import DBTIntegrationTest, FakeArgs, use_profile + +class TestBigQueryScripting(DBTIntegrationTest): + + @property + def schema(self): + return "bigquery_test_022" + + @property + def models(self): + return "scripting-models" + + @property + def profile_config(self): + return self.bigquery_profile() + + def assert_incrementals(self): + results = self.run_dbt() + self.assertEqual(len(results), 2) + + self.run_dbt() + self.assertEqual(len(results), 2) diff --git a/test/integration/029_docs_generate_tests/bq_models/clustered.sql b/test/integration/029_docs_generate_tests/bq_models/clustered.sql index 2b1bbbec736..744d2ecb298 100644 --- a/test/integration/029_docs_generate_tests/bq_models/clustered.sql +++ b/test/integration/029_docs_generate_tests/bq_models/clustered.sql @@ -1,7 +1,7 @@ {{ config( materialized='table', - partition_by='updated_at', + partition_by={'field': 'updated_at', 'data_type': 'date'}, cluster_by=['first_name'] ) }} diff --git a/test/integration/029_docs_generate_tests/bq_models/multi_clustered.sql b/test/integration/029_docs_generate_tests/bq_models/multi_clustered.sql index c620d637829..e47df02f9b0 100644 --- a/test/integration/029_docs_generate_tests/bq_models/multi_clustered.sql +++ b/test/integration/029_docs_generate_tests/bq_models/multi_clustered.sql @@ -1,7 +1,7 @@ {{ config( materialized='table', - partition_by='updated_at', + partition_by={'field': 'updated_at', 'data_type': 'date'}, cluster_by=['first_name','email'] ) }} diff --git a/test/integration/029_docs_generate_tests/test_docs_generate.py b/test/integration/029_docs_generate_tests/test_docs_generate.py index eacb11422f6..649d27c0371 100644 --- a/test/integration/029_docs_generate_tests/test_docs_generate.py +++ b/test/integration/029_docs_generate_tests/test_docs_generate.py @@ -1882,7 +1882,7 @@ def expected_bigquery_complex_manifest(self): 'column_types': {}, 'enabled': True, 'materialized': 'table', - 'partition_by': 'updated_at', + 'partition_by': {'field': 'updated_at', 'data_type': 'date'}, 'persist_docs': {}, 'post-hook': [], 'pre-hook': [], @@ -1962,7 +1962,7 @@ def expected_bigquery_complex_manifest(self): 'column_types': {}, 'enabled': True, 'materialized': 'table', - 'partition_by': 'updated_at', + 'partition_by': {'field': 'updated_at', 'data_type': 'date'}, 'persist_docs': {}, 'post-hook': [], 'pre-hook': [], diff --git a/test/unit/test_bigquery_adapter.py b/test/unit/test_bigquery_adapter.py index d1e994ab6f7..bb40402aadf 100644 --- a/test/unit/test_bigquery_adapter.py +++ b/test/unit/test_bigquery_adapter.py @@ -383,3 +383,66 @@ def test_query_and_results(self, mock_bq): mock_bq.QueryJobConfig.assert_called_once() self.mock_client.query.assert_called_once_with( 'sql', job_config=mock_bq.QueryJobConfig()) + + +class TestBigQueryTableOptions(BaseTestBigQueryAdapter): + def test_parse_partition_by(self): + adapter = self.get_adapter('oauth') + + self.assertEqual( + adapter.parse_partition_by("date(ts)").to_dict(), { + "field": "ts", + "data_type": "timestamp" + } + ) + + self.assertEqual( + adapter.parse_partition_by("ts").to_dict(), { + "field": "ts", + "data_type": "date" + } + ) + + self.assertEqual( + adapter.parse_partition_by({ + "field": "ts", + }).to_dict(), { + "field": "ts", + "data_type": "date" + } + ) + + self.assertEqual( + adapter.parse_partition_by({ + "field": "ts", + "data_type": "date", + }).to_dict(), { + "field": "ts", + "data_type": "date" + } + ) + + # Invalid, should raise an error + with self.assertRaises(dbt.exceptions.CompilationException): + adapter.parse_partition_by({}) + + # passthrough + self.assertEqual( + adapter.parse_partition_by({ + "field": "id", + "data_type": "int64", + "range": { + "start": 1, + "end": 100, + "interval": 20 + } + }).to_dict(), { + "field": "id", + "data_type": "int64", + "range": { + "start": 1, + "end": 100, + "interval": 20 + } + } + ) diff --git a/test/unit/test_utils.py b/test/unit/test_utils.py index a66b2f5a72d..0f294fa69ca 100644 --- a/test/unit/test_utils.py +++ b/test/unit/test_utils.py @@ -139,3 +139,16 @@ def test_trivial(self): with self.assertRaises(dbt.exceptions.DbtConfigError): dbt.utils.deep_map(lambda x, _: x, {'foo': object()}) + + +class TestBytesFormatting(unittest.TestCase): + + def test__simple_cases(self): + self.assertEqual(dbt.utils.format_bytes(-1), '-1.0 Bytes') + self.assertEqual(dbt.utils.format_bytes(0), '0.0 Bytes') + self.assertEqual(dbt.utils.format_bytes(20), '20.0 Bytes') + self.assertEqual(dbt.utils.format_bytes(1030), '1.0 KB') + self.assertEqual(dbt.utils.format_bytes(1024**2*1.5), '1.5 MB') + self.assertEqual(dbt.utils.format_bytes(1024**3*52.6), '52.6 GB') + self.assertEqual(dbt.utils.format_bytes(1024**4*128), '128.0 TB') + self.assertEqual(dbt.utils.format_bytes(1024**5+1), '> 1024 TB')