Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/snowflake view transactions #940

Merged
merged 8 commits into from
Aug 24, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions dbt/context/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,8 @@ def _return(value):


def get_this_relation(db_wrapper, project_cfg, profile, model):
table_name = dbt.utils.model_immediate_name(
model, dbt.flags.NON_DESTRUCTIVE)

return db_wrapper.adapter.Relation.create_from_node(
profile, model, table_name=table_name)
profile, model)


def create_relation(relation_type, quoting_config):
Expand Down
6 changes: 6 additions & 0 deletions dbt/include/global_project/macros/adapters/snowflake.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@

{{ default__create_table_as(temporary, relation, sql) }}
{% endmacro %}

{% macro snowflake__create_view_as(relation, sql) -%}
create or replace view {{ relation }} as (
{{ sql }}
);
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
{% materialization table, adapter='snowflake' %}
{%- set identifier = model['alias'] -%}
{%- set tmp_identifier = identifier + '__dbt_tmp' -%}
{%- set backup_identifier = identifier + '__dbt_backup' -%}
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}

{%- set existing_relations = adapter.list_relations(schema=schema) -%}
{%- set old_relation = adapter.get_relation(relations_list=existing_relations,
schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema, type='table') -%}
{%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier,
schema=schema, type='table') -%}

/*
See ../view/view.sql for more information about this relation.
*/
{%- set backup_relation = api.Relation.create(identifier=backup_identifier,
schema=schema, type=(old_relation.type or 'table')) -%}

{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}
{%- set create_as_temporary = (exists_as_table and non_destructive_mode) -%}


-- drop the temp relations if they exists for some reason
{{ adapter.drop_relation(intermediate_relation) }}
{{ adapter.drop_relation(backup_relation) }}

-- setup: if the target relation already exists, truncate or drop it (if it's a view)
{% if non_destructive_mode -%}
{% if exists_as_table -%}
{{ adapter.truncate_relation(old_relation) }}
{% elif exists_as_view -%}
{{ adapter.drop_relation(old_relation) }}
{%- set old_relation = none -%}
{%- endif %}
{%- endif %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- build model
{% call statement('main') -%}
{%- if non_destructive_mode -%}
{%- if old_relation is not none -%}
{{ create_table_as(create_as_temporary, intermediate_relation, sql) }}

{% set dest_columns = adapter.get_columns_in_table(schema, identifier) %}
{% set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') %}

insert into {{ target_relation }} ({{ dest_cols_csv }}) (
select {{ dest_cols_csv }}
from {{ intermediate_relation.include(schema=(not create_as_temporary)) }}
);
{%- else -%}
{{ create_table_as(create_as_temporary, target_relation, sql) }}
{%- endif -%}
{%- else -%}
{{ create_table_as(create_as_temporary, intermediate_relation, sql) }}
{%- endif -%}
{%- endcall %}

-- cleanup
{% if non_destructive_mode -%}
-- noop
{%- else -%}
{% if old_relation is not none %}
{% if old_relation.type == 'view' %}
{#-- This is the primary difference between Snowflake and Redshift. Renaming this view
-- would cause an error if the view has become invalid due to upstream schema changes #}
{{ log("Dropping relation " ~ old_relation ~ " because it is a view and this model is a table.") }}
{{ drop_relation_if_exists(old_relation) }}
{% else %}
{{ adapter.rename_relation(target_relation, backup_relation) }}
{% endif %}
{% endif %}

{{ adapter.rename_relation(intermediate_relation, target_relation) }}
{%- endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

-- finally, drop the existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{% endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,19 @@
{%- endif -%}
{%- endcall %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- cleanup
{% if non_destructive_mode -%}
-- noop
{%- else -%}
{% if old_relation is not none %}
-- move the existing relation out of the way
{{ adapter.rename_relation(target_relation, backup_relation) }}
{{ adapter.rename_relation(target_relation, backup_relation) }}
{% endif %}

{{ adapter.rename_relation(intermediate_relation, target_relation) }}
{%- endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@

{% macro handle_existing_table(full_refresh, non_destructive_mode, old_relation) %}
{{ adapter_macro("dbt.handle_existing_table", full_refresh, non_destructive_mode, old_relation) }}
{% endmacro %}

{% macro default__handle_existing_table(full_refresh, non_destructive_mode, old_relation) %}
{%- if not non_destructive_mode -%}
{{ adapter.drop_relation(old_relation) }}
{%- endif -%}
{% endmacro %}

{% macro bigquery__handle_existing_table(full_refresh, non_destructive_mode, old_relation) %}
{%- if full_refresh and not non_destructive_mode -%}
{{ adapter.drop_relation(old_relation) }}
{%- else -%}
{{ exceptions.relation_wrong_type(old_relation, 'view') }}
{%- endif -%}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about this quite a bit, and I am not sure about the split here. Should snowflake really behave this fundamentally differently from bigquery? Never raise and ignore the full refresh flag?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The decision to raise this error on BigQuery was, I think, a shortsighted one. The error we throw here is very specifically intended to avoid clobbering an ingestion-time partitioned table with a view by accident. This could happen if you switch the type of your materialization from table to view. We don't do this check anywhere else in dbt, and ingestion-time partitioned tables are mostly superseded by column partitioning, so I don't know that too many people are actually being helped out by this error in practice.

I'm in favor of dropping the exception if @cmcarthur is into it as well. I agree that it's a thorny maintenance burden and has little utility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you remove the exception you also get to remove handle_existing_table entirely, and removing code is always a win.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drewbanin I defer totally to you on it. IMO it's totally fine to handle these kinds of errors situationally based on the adapter. We added this error for a really good reason, namely building time partitioned tables was costly, and dropping it unintentionally would cause users a lot of pain. If you believe that this isn't going to cause pain then I'm on board with removing it

{% endmacro %}


{# /*
Core materialization implementation. BigQuery and Snowflake are similar
because both can use `create or replace view` where the resulting view schema
is not necessarily the same as the existing view. On Redshift, this would
result in: ERROR: cannot change number of columns in view

This implementation is superior to the create_temp, swap_with_existing, drop_old
paradigm because transactions don't run DDL queries atomically on Snowflake. By using
`create or replace view`, the materialization becomes atomic in nature.
*/
#}

{% macro impl_view_materialization(run_outside_transaction_hooks=True) %}
{%- set identifier = model['alias'] -%}
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}

{%- set old_relation = adapter.get_relation(
schema=schema, identifier=identifier) -%}

{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}

{%- set target_relation = api.Relation.create(
identifier=identifier, schema=schema,
type='view') -%}

{%- set should_ignore = non_destructive_mode and exists_as_view %}
{%- set has_transactional_hooks = (hooks | selectattr('transaction', 'equalto', True) | list | length) > 0 %}

{% if run_outside_transaction_hooks %}
-- no transactions on BigQuery
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{% endif %}

-- `BEGIN` happens here on Snowflake
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- If there's a table with the same name and we weren't told to full refresh,
-- that's an error. If we were told to full refresh, drop it. This behavior differs
-- for Snowflake and BigQuery, so multiple dispatch is used.
{%- if old_relation is not none and old_relation.is_table -%}
{{ handle_existing_table(flags.FULL_REFRESH, non_destructive_mode, old_relation) }}
{%- endif -%}

-- build model
{% if non_destructive_mode -%}
{% call noop_statement('main', status="PASS", res=None) -%}
-- Not running : non-destructive mode
{{ sql }}
{%- endcall %}
{%- else -%}
{% call statement('main') -%}
{{ create_view_as(target_relation, sql) }}
{%- endcall %}
{%- endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{#
-- Don't commit in non-destructive mode _unless_ there are in-transaction hooks
-- TODO : Figure out some other way of doing this that isn't as fragile
#}
{% if has_transactional_hooks or not should_ignore %}
{{ adapter.commit() }}
{% endif %}

{% if run_outside_transaction_hooks %}
-- No transactions on BigQuery
{{ run_hooks(post_hooks, inside_transaction=False) }}
{% endif %}
{% endmacro %}

{% materialization view, adapter='bigquery' -%}
{{ impl_view_materialization(run_outside_transaction_hooks=False) }}
{%- endmaterialization %}

{% materialization view, adapter='snowflake' -%}
{{ impl_view_materialization() }}
{%- endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@
{%- endcall %}
{%- endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- cleanup
{% if not should_ignore -%}
-- move the existing view out of the way
Expand All @@ -72,6 +70,8 @@
{{ adapter.rename_relation(intermediate_relation, target_relation) }}
{%- endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{#
-- Don't commit in non-destructive mode _unless_ there are in-transaction hooks
-- TODO : Figure out some other way of doing this that isn't as fragile
Expand Down
18 changes: 0 additions & 18 deletions dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,24 +83,6 @@ def compiler_warning(model, msg, resource_type='model'):
)


def model_immediate_name(model, non_destructive):
"""
Returns the name of the model relation within the transaction. This is
useful for referencing the model in pre or post hooks. Non-destructive
models aren't created with temp suffixes, nor are incremental models or
seeds.
"""

model_name = model['alias']
is_incremental = (get_materialization(model) == 'incremental')
is_seed = is_type(model, 'seed')

if non_destructive or is_incremental or is_seed:
return model_name
else:
return "{}__dbt_tmp".format(model_name)


def find_operation_by_name(flat_graph, target_name, target_package):
return find_by_name(flat_graph, target_name, target_package,
'macros', [NodeType.Operation])
Expand Down
8 changes: 4 additions & 4 deletions test/integration/013_context_var_tests/test_context_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ def test_env_vars_dev(self):

self.assertEqual(
ctx['this'],
'"{}"."context__dbt_tmp"'.format(self.unique_schema()))
'"{}"."context"'.format(self.unique_schema()))

self.assertEqual(ctx['this.name'], 'context')
self.assertEqual(ctx['this.schema'], self.unique_schema())
self.assertEqual(ctx['this.table'], 'context__dbt_tmp')
self.assertEqual(ctx['this.table'], 'context')

self.assertEqual(ctx['target.dbname'], 'dbt')
self.assertEqual(ctx['target.host'], 'database')
Expand All @@ -118,11 +118,11 @@ def test_env_vars_prod(self):

self.assertEqual(
ctx['this'],
'"{}"."context__dbt_tmp"'.format(self.unique_schema()))
'"{}"."context"'.format(self.unique_schema()))

self.assertEqual(ctx['this.name'], 'context')
self.assertEqual(ctx['this.schema'], self.unique_schema())
self.assertEqual(ctx['this.table'], 'context__dbt_tmp')
self.assertEqual(ctx['this.table'], 'context')

self.assertEqual(ctx['target.dbname'], 'dbt')
self.assertEqual(ctx['target.host'], 'database')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id,name
1,Drew
2,Jake
3,Connor
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

{{ config(materialized='table') }}

select *
{% if var('add_table_field', False) %}
, 1 as new_field
{% endif %}

from {{ ref('people') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

{% if var('dependent_type', 'view') == 'view' %}
{{ config(materialized='view') }}
{% else %}
{{ config(materialized='table') }}
{% endif %}

select * from {{ ref('base_table') }}
Loading