Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run hooks outside of a transaction #510

Merged
merged 11 commits into from
Aug 29, 2017
14 changes: 9 additions & 5 deletions dbt/adapters/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ class DefaultAdapter(object):
"truncate",
"add_query",
"expand_target_column_types",
"quote_schema_and_table",
]

raw_functions = [
"get_status",
"get_result_from_cursor",
"quote",
"quote_schema_and_table",
]

###
Expand Down Expand Up @@ -396,6 +396,10 @@ def reload(cls, connection):
def add_begin_query(cls, profile, name):
return cls.add_query(profile, 'BEGIN', name, auto_begin=False)

@classmethod
def add_commit_query(cls, profile, name):
return cls.add_query(profile, 'COMMIT', name, auto_begin=False)

@classmethod
def begin(cls, profile, name='master'):
global connections_in_use
Expand Down Expand Up @@ -428,10 +432,10 @@ def commit_if_has_connection(cls, profile, name):

connection = cls.get_connection(profile, name, False)

return cls.commit(connection)
return cls.commit(profile, connection)

@classmethod
def commit(cls, connection):
def commit(cls, profile, connection):
global connections_in_use

if dbt.flags.STRICT_MODE:
Expand All @@ -445,7 +449,7 @@ def commit(cls, connection):
'it does not have one open!'.format(connection.get('name')))

logger.debug('On {}: COMMIT'.format(connection.get('name')))
connection.get('handle').commit()
cls.add_commit_query(profile, connection.get('name'))

connection['transaction_open'] = False
connections_in_use[connection.get('name')] = connection
Expand Down Expand Up @@ -576,6 +580,6 @@ def quote(cls, identifier):
return '"{}"'.format(identifier)

@classmethod
def quote_schema_and_table(cls, profile, schema, table):
def quote_schema_and_table(cls, profile, schema, table, model_name=None):
return '{}.{}'.format(cls.quote(schema),
cls.quote(table))
4 changes: 2 additions & 2 deletions dbt/adapters/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ def exception_handler(cls, profile, sql, model_name=None,
except psycopg2.DatabaseError as e:
logger.debug('Postgres error: {}'.format(str(e)))

cls.rollback(connection)
cls.release_connection(profile, connection_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this automatically roll back open transactions? would want to be sure we don't create a deadlock because of an open tx

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, release_connection calls rollback under the hood if the transaction is open. This change is to fix an issue where we'd try to rollback transactions that weren't open

raise dbt.exceptions.DatabaseException(
dbt.compat.to_string(e).strip())

except Exception as e:
logger.debug("Error running SQL: %s", sql)
logger.debug("Rolling back transaction.")
cls.rollback(connection)
cls.release_connection(profile, connection_name)
raise dbt.exceptions.RuntimeException(e)

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions dbt/adapters/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ def drop(cls, profile, relation, relation_type, model_name=None):
connection = cls.get_connection(profile, model_name)

if connection.get('transaction_open'):
cls.commit(connection)
cls.commit(profile, connection)

cls.begin(profile, connection.get('name'))

to_return = super(PostgresAdapter, cls).drop(
profile, relation, relation_type, model_name)

cls.commit(connection)
cls.commit(profile, connection)
cls.begin(profile, connection.get('name'))

return to_return
Expand Down
6 changes: 3 additions & 3 deletions dbt/adapters/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ def exception_handler(cls, profile, sql, model_name=None,
if 'Empty SQL statement' in msg:
logger.debug("got empty sql statement, moving on")
elif 'This session does not have a current database' in msg:
cls.rollback(connection)
cls.release_connection(profile, connection_name)
raise dbt.exceptions.FailedToConnectException(
('{}\n\nThis error sometimes occurs when invalid '
'credentials are provided, or when your default role '
'does not have access to use the specified database. '
'Please double check your profile and try again.')
.format(msg))
else:
cls.rollback(connection)
cls.release_connection(profile, connection_name)
raise dbt.exceptions.DatabaseException(msg)
except Exception as e:
logger.debug("Error running SQL: %s", sql)
logger.debug("Rolling back transaction.")
cls.rollback(connection)
cls.release_connection(profile, connection_name)
raise dbt.exceptions.RuntimeException(e.msg)

@classmethod
Expand Down
20 changes: 18 additions & 2 deletions dbt/context/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@
def get_hooks(model, context, hook_key):
hooks = model.get('config', {}).get(hook_key, [])

if isinstance(hooks, basestring):
if not isinstance(hooks, (list, tuple)):
hooks = [hooks]

return hooks
wrapped_hooks = []
for hook in hooks:
if isinstance(hook, dict):
hook = json.dumps(hook)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only part of this branch that doesn't feel right to me. it seems like we should do the opposite: parse all the hooks to their dictionary version and return those. then you don't need to decode each hook individually in a loop before you can perform filter logic.

that would let you write, for example:

{% macro run_hooks(hooks, inside_transaction=True) %}
  {% for hook in hooks | selectattr('transaction', 'equalto', inside_transaction)  %}
    {% call statement(auto_begin=inside_transaction) %}
      {{ hook.get('sql') }}
    {% endcall %}
  {% endfor %}
{% endmacro %}

(and, in general, lets you do more powerful comprehensions on hooks)

wrapped_hooks.append(hook)

return wrapped_hooks


class DatabaseWrapper(object):
Expand Down Expand Up @@ -227,6 +233,15 @@ def fn(string):
return fn


def fromjson(node):
def fn(string, default=None):
try:
return json.loads(string)
except ValueError as e:
return default
return fn


def generate(model, project, flat_graph, provider=None):
"""
Not meant to be called directly. Call with either:
Expand Down Expand Up @@ -270,6 +285,7 @@ def generate(model, project, flat_graph, provider=None):
"schema": schema,
"sql": model.get('injected_sql'),
"sql_now": adapter.date_function(),
"fromjson": fromjson(model),
"target": target,
"this": dbt.utils.This(
schema,
Expand Down
4 changes: 2 additions & 2 deletions dbt/include/global_project/macros/core.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro statement(name=None, fetch_result=False) -%}
{% macro statement(name=None, fetch_result=False, auto_begin=True) -%}
{%- if execute: -%}
{%- set sql = render(caller()) -%}

Expand All @@ -7,7 +7,7 @@
{{ write(sql) }}
{%- endif -%}

{%- set _, cursor = adapter.add_query(sql) -%}
{%- set _, cursor = adapter.add_query(sql, auto_begin=auto_begin) -%}
{%- if name is not none -%}
{%- set result = [] if not fetch_result else adapter.get_result_from_cursor(cursor) -%}
{{ store_result(name, status=adapter.get_status(cursor), data=result) }}
Expand Down
34 changes: 30 additions & 4 deletions dbt/include/global_project/macros/materializations/helpers.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
{% macro run_hooks(hooks) %}
{% macro run_hooks(hooks, inside_transaction=True) %}
{% for hook in hooks %}
{% call statement() %}
{{ hook }};
{% endcall %}
{%- set hook_data = fromjson(render(hook), {}) -%}
{%- set hook_is_in_transaction = hook_data.get('transaction', True) -%};
{%- set hook_sql = hook_data.get('sql', hook) -%};

{%- if hook_is_in_transaction == inside_transaction -%}
{% call statement(auto_begin=inside_transaction) %}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is slick

{{ hook_sql }}
{% endcall %}
{%- endif -%}
{% endfor %}
{% endmacro %}

Expand All @@ -21,6 +27,26 @@
{% endmacro %}


{% macro make_hook_config(sql, inside_transaction) %}
{{ {"sql": sql, "transaction": inside_transaction} | tojson }}
{% endmacro %}


{% macro before_begin(sql) %}
{{ make_hook_config(sql, inside_transaction=False) }}
{% endmacro %}


{% macro after_commit(sql) %}
{{ make_hook_config(sql, inside_transaction=False) }}
{% endmacro %}


{% macro vacuum(tbl) %}
{{ after_commit('vacuum ' ~ adapter.quote_schema_and_table(tbl.schema, tbl.name)) }}
{% endmacro %}


{% macro drop_if_exists(existing, name) %}
{% set existing_type = existing.get(name) %}
{% if existing_type is not none %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
{{ adapter.drop(identifier, existing_type) }}
{%- endif %}

{{ run_hooks(pre_hooks) }}
{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- build model
{% if force_create or not adapter.already_exists(schema, identifier) -%}
Expand Down Expand Up @@ -79,8 +82,11 @@
{% endcall %}
{%- endif %}

{{ run_hooks(post_hooks) }}
{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{%- endmaterialization %}
10 changes: 8 additions & 2 deletions dbt/include/global_project/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
{%- endif %}
{%- endif %}

{{ run_hooks(pre_hooks) }}
{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- build model
{% call statement('main') -%}
Expand All @@ -39,7 +42,7 @@
{%- endif -%}
{%- endcall %}

{{ run_hooks(post_hooks) }}
{{ run_hooks(post_hooks, inside_transaction=True) }}

-- cleanup
{% if non_destructive_mode -%}
Expand All @@ -49,5 +52,8 @@
{{ adapter.rename(tmp_identifier, identifier) }}
{%- endif %}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{% endmaterialization %}
9 changes: 7 additions & 2 deletions dbt/include/global_project/macros/materializations/view.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
{%- set existing = adapter.query_for_existing(schema) -%}
{%- set existing_type = existing.get(identifier) -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ drop_if_exists(existing, tmp_identifier) }}

{{ run_hooks(pre_hooks) }}
-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- build model
{% if non_destructive_mode and existing_type == 'view' -%}
Expand All @@ -19,7 +21,7 @@
{%- endcall %}
{%- endif %}

{{ run_hooks(post_hooks) }}
{{ run_hooks(post_hooks, inside_transaction=True) }}

-- cleanup
{% if non_destructive_mode and existing_type == 'view' -%}
Expand All @@ -29,6 +31,9 @@
{{ adapter.rename(tmp_identifier, identifier) }}
{%- endif %}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{%- endmaterialization -%}
15 changes: 3 additions & 12 deletions dbt/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,13 @@ def update_in_model_config(self, config):
self.in_model_config.update(config)

def __get_hooks(self, relevant_configs, key):
hooks = []

if key not in relevant_configs:
return []

new_hooks = relevant_configs[key]
if type(new_hooks) not in [list, tuple]:
new_hooks = [new_hooks]

for hook in new_hooks:
if not isinstance(hook, basestring):
name = ".".join(self.fqn)
dbt.exceptions.raise_compiler_error(
"{} for model {} is not a string!".format(key, name))
hooks = relevant_configs[key]
if not isinstance(hooks, (list, tuple)):
hooks = [hooks]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice


hooks.append(hook)
return hooks

def smart_update(self, mutable_config, new_configs):
Expand Down