Skip to content

Commit

Permalink
Merge pull request #940 from fishtown-analytics/fix/snowflake-view-tr…
Browse files Browse the repository at this point in the history
…ansactions

Fix/snowflake view transactions
  • Loading branch information
drewbanin committed Aug 24, 2018
2 parents 435f1b4 + 78fd05a commit b1e186a
Show file tree
Hide file tree
Showing 14 changed files with 313 additions and 73 deletions.
5 changes: 1 addition & 4 deletions dbt/context/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,8 @@ def _return(value):


def get_this_relation(db_wrapper, project_cfg, profile, model):
table_name = dbt.utils.model_immediate_name(
model, dbt.flags.NON_DESTRUCTIVE)

return db_wrapper.adapter.Relation.create_from_node(
profile, model, table_name=table_name)
profile, model)


def create_relation(relation_type, quoting_config):
Expand Down
6 changes: 6 additions & 0 deletions dbt/include/global_project/macros/adapters/snowflake.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@

{{ default__create_table_as(temporary, relation, sql) }}
{% endmacro %}

{% macro snowflake__create_view_as(relation, sql) -%}
create or replace view {{ relation }} as (
{{ sql }}
);
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
{% materialization table, adapter='snowflake' %}
{%- set identifier = model['alias'] -%}
{%- set tmp_identifier = identifier + '__dbt_tmp' -%}
{%- set backup_identifier = identifier + '__dbt_backup' -%}
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}

{%- set existing_relations = adapter.list_relations(schema=schema) -%}
{%- set old_relation = adapter.get_relation(relations_list=existing_relations,
schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema, type='table') -%}
{%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier,
schema=schema, type='table') -%}

/*
See ../view/view.sql for more information about this relation.
*/
{%- set backup_relation = api.Relation.create(identifier=backup_identifier,
schema=schema, type=(old_relation.type or 'table')) -%}

{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}
{%- set create_as_temporary = (exists_as_table and non_destructive_mode) -%}


-- drop the temp relations if they exists for some reason
{{ adapter.drop_relation(intermediate_relation) }}
{{ adapter.drop_relation(backup_relation) }}

-- setup: if the target relation already exists, truncate or drop it (if it's a view)
{% if non_destructive_mode -%}
{% if exists_as_table -%}
{{ adapter.truncate_relation(old_relation) }}
{% elif exists_as_view -%}
{{ adapter.drop_relation(old_relation) }}
{%- set old_relation = none -%}
{%- endif %}
{%- endif %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- build model
{% call statement('main') -%}
{%- if non_destructive_mode -%}
{%- if old_relation is not none -%}
{{ create_table_as(create_as_temporary, intermediate_relation, sql) }}

{% set dest_columns = adapter.get_columns_in_table(schema, identifier) %}
{% set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') %}

insert into {{ target_relation }} ({{ dest_cols_csv }}) (
select {{ dest_cols_csv }}
from {{ intermediate_relation.include(schema=(not create_as_temporary)) }}
);
{%- else -%}
{{ create_table_as(create_as_temporary, target_relation, sql) }}
{%- endif -%}
{%- else -%}
{{ create_table_as(create_as_temporary, intermediate_relation, sql) }}
{%- endif -%}
{%- endcall %}

-- cleanup
{% if non_destructive_mode -%}
-- noop
{%- else -%}
{% if old_relation is not none %}
{% if old_relation.type == 'view' %}
{#-- This is the primary difference between Snowflake and Redshift. Renaming this view
-- would cause an error if the view has become invalid due to upstream schema changes #}
{{ log("Dropping relation " ~ old_relation ~ " because it is a view and this model is a table.") }}
{{ drop_relation_if_exists(old_relation) }}
{% else %}
{{ adapter.rename_relation(target_relation, backup_relation) }}
{% endif %}
{% endif %}

{{ adapter.rename_relation(intermediate_relation, target_relation) }}
{%- endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

-- finally, drop the existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{% endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,19 @@
{%- endif -%}
{%- endcall %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- cleanup
{% if non_destructive_mode -%}
-- noop
{%- else -%}
{% if old_relation is not none %}
-- move the existing relation out of the way
{{ adapter.rename_relation(target_relation, backup_relation) }}
{{ adapter.rename_relation(target_relation, backup_relation) }}
{% endif %}

{{ adapter.rename_relation(intermediate_relation, target_relation) }}
{%- endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@

{% macro handle_existing_table(full_refresh, non_destructive_mode, old_relation) %}
{{ adapter_macro("dbt.handle_existing_table", full_refresh, non_destructive_mode, old_relation) }}
{% endmacro %}

{% macro default__handle_existing_table(full_refresh, non_destructive_mode, old_relation) %}
{%- if not non_destructive_mode -%}
{{ adapter.drop_relation(old_relation) }}
{%- endif -%}
{% endmacro %}

{% macro bigquery__handle_existing_table(full_refresh, non_destructive_mode, old_relation) %}
{%- if full_refresh and not non_destructive_mode -%}
{{ adapter.drop_relation(old_relation) }}
{%- else -%}
{{ exceptions.relation_wrong_type(old_relation, 'view') }}
{%- endif -%}
{% endmacro %}


{# /*
Core materialization implementation. BigQuery and Snowflake are similar
because both can use `create or replace view` where the resulting view schema
is not necessarily the same as the existing view. On Redshift, this would
result in: ERROR: cannot change number of columns in view

This implementation is superior to the create_temp, swap_with_existing, drop_old
paradigm because transactions don't run DDL queries atomically on Snowflake. By using
`create or replace view`, the materialization becomes atomic in nature.
*/
#}
{% macro impl_view_materialization(run_outside_transaction_hooks=True) %}
{%- set identifier = model['alias'] -%}
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}
{%- set old_relation = adapter.get_relation(
schema=schema, identifier=identifier) -%}
{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}
{%- set target_relation = api.Relation.create(
identifier=identifier, schema=schema,
type='view') -%}
{%- set should_ignore = non_destructive_mode and exists_as_view %}
{%- set has_transactional_hooks = (hooks | selectattr('transaction', 'equalto', True) | list | length) > 0 %}
{% if run_outside_transaction_hooks %}
-- no transactions on BigQuery
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{% endif %}
-- `BEGIN` happens here on Snowflake
{{ run_hooks(pre_hooks, inside_transaction=True) }}
-- If there's a table with the same name and we weren't told to full refresh,
-- that's an error. If we were told to full refresh, drop it. This behavior differs
-- for Snowflake and BigQuery, so multiple dispatch is used.
{%- if old_relation is not none and old_relation.is_table -%}
{{ handle_existing_table(flags.FULL_REFRESH, non_destructive_mode, old_relation) }}
{%- endif -%}

-- build model
{% if non_destructive_mode -%}
{% call noop_statement('main', status="PASS", res=None) -%}
-- Not running : non-destructive mode
{{ sql }}
{%- endcall %}
{%- else -%}
{% call statement('main') -%}
{{ create_view_as(target_relation, sql) }}
{%- endcall %}
{%- endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{#
-- Don't commit in non-destructive mode _unless_ there are in-transaction hooks
-- TODO : Figure out some other way of doing this that isn't as fragile
#}
{% if has_transactional_hooks or not should_ignore %}
{{ adapter.commit() }}
{% endif %}

{% if run_outside_transaction_hooks %}
-- No transactions on BigQuery
{{ run_hooks(post_hooks, inside_transaction=False) }}
{% endif %}
{% endmacro %}

{% materialization view, adapter='bigquery' -%}
{{ impl_view_materialization(run_outside_transaction_hooks=False) }}
{%- endmaterialization %}

{% materialization view, adapter='snowflake' -%}
{{ impl_view_materialization() }}
{%- endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@
{%- endcall %}
{%- endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- cleanup
{% if not should_ignore -%}
-- move the existing view out of the way
Expand All @@ -72,6 +70,8 @@
{{ adapter.rename_relation(intermediate_relation, target_relation) }}
{%- endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{#
-- Don't commit in non-destructive mode _unless_ there are in-transaction hooks
-- TODO : Figure out some other way of doing this that isn't as fragile
Expand Down
18 changes: 0 additions & 18 deletions dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,24 +83,6 @@ def compiler_warning(model, msg, resource_type='model'):
)


def model_immediate_name(model, non_destructive):
"""
Returns the name of the model relation within the transaction. This is
useful for referencing the model in pre or post hooks. Non-destructive
models aren't created with temp suffixes, nor are incremental models or
seeds.
"""

model_name = model['alias']
is_incremental = (get_materialization(model) == 'incremental')
is_seed = is_type(model, 'seed')

if non_destructive or is_incremental or is_seed:
return model_name
else:
return "{}__dbt_tmp".format(model_name)


def find_operation_by_name(flat_graph, target_name, target_package):
return find_by_name(flat_graph, target_name, target_package,
'macros', [NodeType.Operation])
Expand Down
8 changes: 4 additions & 4 deletions test/integration/013_context_var_tests/test_context_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ def test_env_vars_dev(self):

self.assertEqual(
ctx['this'],
'"{}"."context__dbt_tmp"'.format(self.unique_schema()))
'"{}"."context"'.format(self.unique_schema()))

self.assertEqual(ctx['this.name'], 'context')
self.assertEqual(ctx['this.schema'], self.unique_schema())
self.assertEqual(ctx['this.table'], 'context__dbt_tmp')
self.assertEqual(ctx['this.table'], 'context')

self.assertEqual(ctx['target.dbname'], 'dbt')
self.assertEqual(ctx['target.host'], 'database')
Expand All @@ -118,11 +118,11 @@ def test_env_vars_prod(self):

self.assertEqual(
ctx['this'],
'"{}"."context__dbt_tmp"'.format(self.unique_schema()))
'"{}"."context"'.format(self.unique_schema()))

self.assertEqual(ctx['this.name'], 'context')
self.assertEqual(ctx['this.schema'], self.unique_schema())
self.assertEqual(ctx['this.table'], 'context__dbt_tmp')
self.assertEqual(ctx['this.table'], 'context')

self.assertEqual(ctx['target.dbname'], 'dbt')
self.assertEqual(ctx['target.host'], 'database')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id,name
1,Drew
2,Jake
3,Connor
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

{{ config(materialized='table') }}

select *
{% if var('add_table_field', False) %}
, 1 as new_field
{% endif %}

from {{ ref('people') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

{% if var('dependent_type', 'view') == 'view' %}
{{ config(materialized='view') }}
{% else %}
{{ config(materialized='table') }}
{% endif %}

select * from {{ ref('base_table') }}
Loading

0 comments on commit b1e186a

Please sign in to comment.