Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/cost effective bq incremental followup #2140

Merged
Merged
21 changes: 21 additions & 0 deletions core/dbt/deprecations.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,26 @@ class ModelsKeyNonModelDeprecation(DBTDeprecation):
'''


class BigQueryPartitionByStringDeprecation(DBTDeprecation):
_name = 'bq-partition-by-string'

_description = '''
As of dbt v0.16.0, the `partition_by` config in BigQuery accepts a
dictionary containing `field` and `data_type`.


- Provided partition_by: {raw_partition_by}


- dbt inferred: {inferred_partition_by}



For more information, see:
https://docs.getdbt.com/docs/upgrading-to-0-16-0
'''


_adapter_renamed_description = """\
The adapter function `adapter.{old_name}` is deprecated and will be removed in
a future release of dbt. Please use `adapter.{new_name}` instead.
Expand Down Expand Up @@ -151,6 +171,7 @@ def warn(name, *args, **kwargs):
NotADictionaryDeprecation(),
ColumnQuotingDeprecation(),
ModelsKeyNonModelDeprecation(),
BigQueryPartitionByStringDeprecation(),
]

deprecations: Dict[str, DBTDeprecation] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,32 @@


{% macro get_merge_sql(target, source, unique_key, dest_columns) -%}
{{ adapter_macro('get_merge_sql', target, source, unique_key, dest_columns) }}
{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates=none) -%}
{{ adapter_macro('get_merge_sql', target, source, unique_key, dest_columns, predicates) }}
{%- endmacro %}


{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
{{ adapter_macro('get_delete_insert_merge_sql', target, source, unique_key, dest_columns) }}
{%- endmacro %}


{% macro get_quoted_csv(column_names) %}
{% set quoted = [] %}
{% for col in column_names -%}
{%- do quoted.append(adapter.quote(col)) -%}
{%- endfor %}

{%- set dest_cols_csv = quoted | join(', ') -%}
{{ return(dest_cols_csv) }}
{% endmacro %}


{% macro common_get_merge_sql(target, source, unique_key, dest_columns) -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
{% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

{% if unique_key %}
on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% set unique_key_match %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% endset %}
{% do predicates.append(unique_key_match) %}
{% else %}
on FALSE
{% do predicates.append('FALSE') %}
{% endif %}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{ predicates | join(' and ') }}

{% if unique_key %}
when matched then update set
{% for column in dest_columns -%}
Expand All @@ -45,16 +40,17 @@
values
({{ dest_cols_csv }})

{%- endmacro %}
{% endmacro %}

{% macro default__get_merge_sql(target, source, unique_key, dest_columns) -%}
{% set typename = adapter.type() %}

{{ exceptions.raise_compiler_error(
'get_merge_sql is not implemented for {}'.format(typename)
)
}}
{% macro get_quoted_csv(column_names) %}
{% set quoted = [] %}
{% for col in column_names -%}
{%- do quoted.append(adapter.quote(col)) -%}
{%- endfor %}

{%- set dest_cols_csv = quoted | join(', ') -%}
{{ return(dest_cols_csv) }}
{% endmacro %}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@
insert into {{ tmp_relation }} (dbt_change_type, dbt_scd_id, dbt_valid_to)
select dbt_change_type, dbt_scd_id, dbt_valid_to from (
{{ updates_select }}
) dbt_sbq;
) dbt_sbq
{% endcall %}

{% do return(tmp_relation) %}
Expand Down
1 change: 1 addition & 0 deletions core/dbt/links.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
ProfileConfigDocs = 'https://docs.getdbt.com/docs/configure-your-profile'
SnowflakeQuotingDocs = 'https://docs.getdbt.com/v0.10/docs/configuring-quoting'
IncrementalDocs = 'https://docs.getdbt.com/docs/configuring-incremental-models'
BigQueryNewPartitionBy = 'https://docs.getdbt.com/docs/upgrading-to-0-16-0'
9 changes: 9 additions & 0 deletions core/dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,12 @@ def __init__(self, func):

def __get__(self, obj, objtype):
return self.func(objtype)


def format_bytes(num_bytes):
for unit in ['Bytes', 'KB', 'MB', 'GB', 'TB']:
if abs(num_bytes) < 1024.0:
return f"{num_bytes:3.1f} {unit}"
num_bytes /= 1024.0

return "> 1024 TB"
28 changes: 13 additions & 15 deletions plugins/bigquery/dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from google.api_core import retry, client_info
from google.oauth2 import service_account

from dbt.utils import format_bytes
from dbt.clients import agate_helper, gcloud
from dbt.exceptions import (
FailedToConnectException, RuntimeException, DatabaseException
Expand Down Expand Up @@ -66,13 +67,9 @@ class BigQueryConnectionManager(BaseConnectionManager):
DEFAULT_MAXIMUM_DELAY = 1.0 # Seconds

@classmethod
def handle_error(cls, error, message, sql):
logger.debug(message.format(sql=sql))
logger.debug(str(error))
error_msg = "\n".join(
[item['message'] for item in error.errors])

raise DatabaseException(error_msg) from error
def handle_error(cls, error, message):
error_msg = "\n".join([item['message'] for item in error.errors])
raise DatabaseException(error_msg)

def clear_transaction(self):
pass
Expand All @@ -83,12 +80,12 @@ def exception_handler(self, sql):
yield

except google.cloud.exceptions.BadRequest as e:
message = "Bad request while running:\n{sql}"
self.handle_error(e, message, sql)
message = "Bad request while running query"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing BQ debug logs were incredibly verbose, printing the same SQL and error messages three times over. These changes should only print out the SQL once (when it is executed) and should suppress the re-printing of the query in the BQ exception message.

self.handle_error(e, message)

except google.cloud.exceptions.Forbidden as e:
message = "Access denied while running:\n{sql}"
self.handle_error(e, message, sql)
message = "Access denied while running query"
self.handle_error(e, message)

except Exception as e:
logger.debug("Unhandled error while running:\n{}".format(sql))
Expand All @@ -98,7 +95,7 @@ def exception_handler(self, sql):
# this sounds a lot like a signal handler and probably has
# useful information, so raise it without modification.
raise
raise RuntimeException(str(e)) from e
raise RuntimeException(str(e))

def cancel_open(self) -> None:
pass
Expand Down Expand Up @@ -237,6 +234,10 @@ def execute(self, sql, auto_begin=False, fetch=None):
table = client.get_table(query_job.destination)
status = 'CREATE TABLE ({})'.format(table.num_rows)

elif query_job.statement_type == 'SCRIPT':
processed = format_bytes(query_job.total_bytes_processed)
status = f'SCRIPT ({processed} processed)'

elif query_job.statement_type in ['INSERT', 'DELETE', 'MERGE']:
status = '{} ({})'.format(
query_job.statement_type,
Expand Down Expand Up @@ -372,9 +373,6 @@ def count_error(self, error):
self.error_count, self.retries, repr(error))
return True
else:
logger.debug(
'Not Retrying after {} previous attempts. Error: {}',
self.error_count - 1, repr(error))
return False


Expand Down
Loading