Skip to content

Commit

Permalink
Merge pull request #2140 from fishtown-analytics/feature/cost-effecti…
Browse files Browse the repository at this point in the history
…ve-bq-incremental-followup

Feature/cost effective bq incremental followup
  • Loading branch information
beckjake committed Feb 24, 2020
2 parents 0ad94f8 + 202f8a1 commit bcea7cc
Show file tree
Hide file tree
Showing 31 changed files with 655 additions and 92 deletions.
21 changes: 21 additions & 0 deletions core/dbt/deprecations.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,26 @@ class ModelsKeyNonModelDeprecation(DBTDeprecation):
'''


class BigQueryPartitionByStringDeprecation(DBTDeprecation):
_name = 'bq-partition-by-string'

_description = '''
As of dbt v0.16.0, the `partition_by` config in BigQuery accepts a
dictionary containing `field` and `data_type`.
- Provided partition_by: {raw_partition_by}
- dbt inferred: {inferred_partition_by}
For more information, see:
https://docs.getdbt.com/docs/upgrading-to-0-16-0
'''


_adapter_renamed_description = """\
The adapter function `adapter.{old_name}` is deprecated and will be removed in
a future release of dbt. Please use `adapter.{new_name}` instead.
Expand Down Expand Up @@ -151,6 +171,7 @@ def warn(name, *args, **kwargs):
NotADictionaryDeprecation(),
ColumnQuotingDeprecation(),
ModelsKeyNonModelDeprecation(),
BigQueryPartitionByStringDeprecation(),
]

deprecations: Dict[str, DBTDeprecation] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,32 @@


{% macro get_merge_sql(target, source, unique_key, dest_columns) -%}
{{ adapter_macro('get_merge_sql', target, source, unique_key, dest_columns) }}
{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates=none) -%}
{{ adapter_macro('get_merge_sql', target, source, unique_key, dest_columns, predicates) }}
{%- endmacro %}


{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
{{ adapter_macro('get_delete_insert_merge_sql', target, source, unique_key, dest_columns) }}
{%- endmacro %}


{% macro get_quoted_csv(column_names) %}
{% set quoted = [] %}
{% for col in column_names -%}
{%- do quoted.append(adapter.quote(col)) -%}
{%- endfor %}

{%- set dest_cols_csv = quoted | join(', ') -%}
{{ return(dest_cols_csv) }}
{% endmacro %}


{% macro common_get_merge_sql(target, source, unique_key, dest_columns) -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
{% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

{% if unique_key %}
on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% set unique_key_match %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% endset %}
{% do predicates.append(unique_key_match) %}
{% else %}
on FALSE
{% do predicates.append('FALSE') %}
{% endif %}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{ predicates | join(' and ') }}

{% if unique_key %}
when matched then update set
{% for column in dest_columns -%}
Expand All @@ -45,16 +40,17 @@
values
({{ dest_cols_csv }})

{%- endmacro %}
{% endmacro %}

{% macro default__get_merge_sql(target, source, unique_key, dest_columns) -%}
{% set typename = adapter.type() %}

{{ exceptions.raise_compiler_error(
'get_merge_sql is not implemented for {}'.format(typename)
)
}}
{% macro get_quoted_csv(column_names) %}
{% set quoted = [] %}
{% for col in column_names -%}
{%- do quoted.append(adapter.quote(col)) -%}
{%- endfor %}

{%- set dest_cols_csv = quoted | join(', ') -%}
{{ return(dest_cols_csv) }}
{% endmacro %}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@
insert into {{ tmp_relation }} (dbt_change_type, dbt_scd_id, dbt_valid_to)
select dbt_change_type, dbt_scd_id, dbt_valid_to from (
{{ updates_select }}
) dbt_sbq;
) dbt_sbq
{% endcall %}

{% do return(tmp_relation) %}
Expand Down
1 change: 1 addition & 0 deletions core/dbt/links.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
ProfileConfigDocs = 'https://docs.getdbt.com/docs/configure-your-profile'
SnowflakeQuotingDocs = 'https://docs.getdbt.com/v0.10/docs/configuring-quoting'
IncrementalDocs = 'https://docs.getdbt.com/docs/configuring-incremental-models'
BigQueryNewPartitionBy = 'https://docs.getdbt.com/docs/upgrading-to-0-16-0'
9 changes: 9 additions & 0 deletions core/dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,12 @@ def __init__(self, func):

def __get__(self, obj, objtype):
return self.func(objtype)


def format_bytes(num_bytes):
for unit in ['Bytes', 'KB', 'MB', 'GB', 'TB']:
if abs(num_bytes) < 1024.0:
return f"{num_bytes:3.1f} {unit}"
num_bytes /= 1024.0

return "> 1024 TB"
28 changes: 13 additions & 15 deletions plugins/bigquery/dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from google.api_core import retry, client_info
from google.oauth2 import service_account

from dbt.utils import format_bytes
from dbt.clients import agate_helper, gcloud
from dbt.exceptions import (
FailedToConnectException, RuntimeException, DatabaseException
Expand Down Expand Up @@ -66,13 +67,9 @@ class BigQueryConnectionManager(BaseConnectionManager):
DEFAULT_MAXIMUM_DELAY = 1.0 # Seconds

@classmethod
def handle_error(cls, error, message, sql):
logger.debug(message.format(sql=sql))
logger.debug(str(error))
error_msg = "\n".join(
[item['message'] for item in error.errors])

raise DatabaseException(error_msg) from error
def handle_error(cls, error, message):
error_msg = "\n".join([item['message'] for item in error.errors])
raise DatabaseException(error_msg)

def clear_transaction(self):
pass
Expand All @@ -83,12 +80,12 @@ def exception_handler(self, sql):
yield

except google.cloud.exceptions.BadRequest as e:
message = "Bad request while running:\n{sql}"
self.handle_error(e, message, sql)
message = "Bad request while running query"
self.handle_error(e, message)

except google.cloud.exceptions.Forbidden as e:
message = "Access denied while running:\n{sql}"
self.handle_error(e, message, sql)
message = "Access denied while running query"
self.handle_error(e, message)

except Exception as e:
logger.debug("Unhandled error while running:\n{}".format(sql))
Expand All @@ -98,7 +95,7 @@ def exception_handler(self, sql):
# this sounds a lot like a signal handler and probably has
# useful information, so raise it without modification.
raise
raise RuntimeException(str(e)) from e
raise RuntimeException(str(e))

def cancel_open(self) -> None:
pass
Expand Down Expand Up @@ -237,6 +234,10 @@ def execute(self, sql, auto_begin=False, fetch=None):
table = client.get_table(query_job.destination)
status = 'CREATE TABLE ({})'.format(table.num_rows)

elif query_job.statement_type == 'SCRIPT':
processed = format_bytes(query_job.total_bytes_processed)
status = f'SCRIPT ({processed} processed)'

elif query_job.statement_type in ['INSERT', 'DELETE', 'MERGE']:
status = '{} ({})'.format(
query_job.statement_type,
Expand Down Expand Up @@ -372,9 +373,6 @@ def count_error(self, error):
self.error_count, self.retries, repr(error))
return True
else:
logger.debug(
'Not Retrying after {} previous attempts. Error: {}',
self.error_count - 1, repr(error))
return False


Expand Down
Loading

0 comments on commit bcea7cc

Please sign in to comment.