Skip to content

Commit

Permalink
use create table as syntax on BigQuery (#717)
Browse files Browse the repository at this point in the history
(wip) Fixes: #712
Fixes: #716
  • Loading branch information
drewbanin committed Apr 6, 2018
1 parent 2d441f8 commit 68634a2
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 59 deletions.
17 changes: 16 additions & 1 deletion dbt/adapters/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class BigQueryAdapter(PostgresAdapter):
"execute",
"quote_schema_and_table",
"make_date_partitioned_table",
"already_exists",
"expand_target_column_types",

"get_columns_in_table"
]

Expand Down Expand Up @@ -175,6 +178,12 @@ def query_for_existing(cls, profile, schemas, model_name=None):

return dict(existing)

@classmethod
def table_exists(cls, profile, schema, table, model_name=None):
tables = cls.query_for_existing(profile, schema, model_name)
exists = tables.get(table) is not None
return exists

@classmethod
def drop(cls, profile, schema, relation, relation_type, model_name=None):
conn = cls.get_connection(profile, model_name)
Expand Down Expand Up @@ -318,7 +327,7 @@ def execute(cls, profile, sql, model_name=None, fetch=False, **kwargs):

# If we get here, the query succeeded
status = 'OK'
return status, res
return status, cls.get_table_from_response(res)

@classmethod
def execute_and_fetch(cls, profile, sql, model_name, auto_begin=None):
Expand Down Expand Up @@ -504,3 +513,9 @@ def load_csv_rows(cls, profile, schema, table_name, agate_table):
client=client, skip_leading_rows=1)
with cls.exception_handler(profile, "LOAD TABLE"):
cls.poll_until_job_completes(job, cls.get_timeout(conn))

@classmethod
def expand_target_column_types(cls, profile, temp_table, to_schema,
to_table, model_name=None):
# This is a no-op on BigQuery
pass
23 changes: 21 additions & 2 deletions dbt/include/global_project/macros/adapters/bigquery.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,27 @@
{% macro partition_by(raw_partition_by) %}
{%- if raw_partition_by is none -%}
{{ return('') }}
{% endif %}

{% set partition_by_clause %}
partition by {{ raw_partition_by }}
{%- endset -%}

{{ return(partition_by_clause) }}
{%- endmacro -%}

{% macro bigquery__create_table_as(temporary, identifier, sql) -%}
{{ adapter.execute_model({"name": identifier, "injected_sql": sql, "schema": schema}, 'table') }}
{%- set raw_partition_by = config.get('partition_by', none) -%}

create or replace table `{{ schema }}`.`{{ identifier }}`
{{ partition_by(raw_partition_by) }}
as (
{{ sql }}
);
{% endmacro %}

{% macro bigquery__create_view_as(identifier, sql) -%}
{{ adapter.execute_model({"name": identifier, "injected_sql": sql, "schema": schema}, 'view') }}
create or replace view `{{ schema }}`.`{{ identifier }}` as (
{{ sql }}
);
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,29 +1,3 @@
{% materialization view, adapter='bigquery' -%}

{%- set identifier = model['name'] -%}
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}
{%- set existing = adapter.query_for_existing(schema) -%}
{%- set existing_type = existing.get(identifier) -%}

{%- if existing_type is not none -%}
{%- if existing_type == 'table' and not flags.FULL_REFRESH -%}
{# this is only intended for date partitioned tables, but we cant see that field in the context #}
{% set error_message -%}
Trying to create model '{{ identifier }}' as a view, but it already exists as a table.
Either drop the '{{ schema }}.{{ identifier }}' table manually, or use --full-refresh
{%- endset %}
{{ exceptions.raise_compiler_error(error_message) }}
{%- endif -%}

{{ adapter.drop(schema, identifier, existing_type) }}
{%- endif -%}

-- build model
{% set result = adapter.execute_model(model, 'view') %}
{{ store_result('main', status=result) }}

{%- endmaterialization %}


{% macro make_date_partitioned_table(model, dates, should_create, verbose=False) %}

Expand All @@ -49,7 +23,7 @@
{% set result_str = 'CREATED ' ~ num_days ~ ' PARTITIONS' %}
{% endif %}

{{ return(result_str) }}
{{ store_result('main', status=result_str) }}

{% endmacro %}

Expand All @@ -60,8 +34,13 @@
{%- set existing = adapter.query_for_existing(schema) -%}
{%- set existing_type = existing.get(identifier) -%}
{%- set verbose = config.get('verbose', False) -%}

{# partitions: iterate over each partition, running a separate query in a for-loop #}
{%- set partitions = config.get('partitions') -%}

{# partition_by: run a single query, specifying a date column to partition by #}
{%- set partition_by = config.get('partition_by', []) -%}

{% if partitions %}
{% if partitions is number or partitions is string %}
{% set partitions = [(partitions | string)] %}
Expand All @@ -82,17 +61,12 @@

-- build model
{% if partitions %}
{% set result = make_date_partitioned_table(model, partitions, (existing_type != 'table'), verbose) %}
{{ make_date_partitioned_table(model, partitions, (existing_type != 'table'), verbose) }}
{% else %}
{% set result = adapter.execute_model(model, 'table') %}
{% call statement('main') -%}
{{ create_table_as(False, identifier, sql) }}
{% endcall -%}
{% endif %}

{{ store_result('main', status=result) }}

{% endmaterialization %}

{% materialization incremental, adapter='bigquery' -%}

{{ exceptions.materialization_not_available(model, 'bigquery') }}

{% endmaterialization %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@

{% materialization view, adapter='bigquery' -%}

{%- set identifier = model['name'] -%}
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}
{%- set existing = adapter.query_for_existing(schema) -%}
{%- set existing_type = existing.get(identifier) -%}

{%- if existing_type is not none -%}
{%- if existing_type == 'table' and not flags.FULL_REFRESH -%}
{# this is only intended for date partitioned tables, but we cant see that field in the context #}
{% set error_message -%}
Trying to create model '{{ identifier }}' as a view, but it already exists as a table.
Either drop the '{{ schema }}.{{ identifier }}' table manually, or use --full-refresh
{%- endset %}
{{ exceptions.raise_compiler_error(error_message) }}
{%- endif -%}

{{ adapter.drop(schema, identifier, existing_type) }}
{%- endif -%}

-- build model
{% if existing_type == 'view' and non_destructive_mode -%}
{% call noop_statement('main', status="PASS", res=None) -%}
-- Not running : non-destructive mode
{{ sql }}
{%- endcall %}
{%- else -%}
{% call statement('main') -%}
{{ create_view_as(identifier, sql) }}
{%- endcall %}
{%- endif %}

{%- endmaterialization %}
20 changes: 0 additions & 20 deletions dbt/include/global_project/macros/materializations/wrapper.sql

This file was deleted.

0 comments on commit 68634a2

Please sign in to comment.