Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wrap models using macros #356

Merged
merged 39 commits into from
Apr 5, 2017
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c591815
code cleanup + fix full-refresh opt
drewbanin Mar 26, 2017
3d4d72d
make integration tests actually test flags
drewbanin Mar 26, 2017
f651568
accidently create operations
drewbanin Mar 26, 2017
e080f73
Handle concurrent `DROP ... CASCADE`s in Redshift (#349)
cmcarthur Mar 26, 2017
fe79e07
working with a few rough edges
Mar 26, 2017
da6c467
Fix/full refresh (#350)
drewbanin Mar 27, 2017
f94f455
Merge branch 'development' into feature/already-exists-compilation-co…
drewbanin Mar 27, 2017
7f54175
test that should be green if macro parsing works the right way
Mar 27, 2017
23d42d2
wip
drewbanin Mar 28, 2017
cacf5c9
Merge branch 'feature/already-exists-compilation-context' into contex…
drewbanin Mar 28, 2017
b43be9e
validate macros don't use context vars
drewbanin Mar 28, 2017
ace4b49
fix tests
drewbanin Mar 28, 2017
bc0e1c2
working with a few rough edges
Mar 26, 2017
694d07b
test that should be green if macro parsing works the right way
Mar 27, 2017
c55ff34
inject into jinja with custom extension
drewbanin Mar 26, 2017
8537229
wip
drewbanin Mar 28, 2017
4846515
validate macros don't use context vars
drewbanin Mar 28, 2017
c06ae6d
fix tests
drewbanin Mar 28, 2017
e03f804
Merge branch 'contextless-macros' of github.com:fishtown-analytics/db…
drewbanin Mar 28, 2017
2b3f46e
add test for `target` in macros, disallow target
drewbanin Mar 28, 2017
e2c279b
add license to RTD
drewbanin Mar 28, 2017
ce96825
wip - materialize via macros
drewbanin Mar 29, 2017
5ffe4a5
always release connections at the end of execute_node (#354)
cmcarthur Mar 29, 2017
d907fca
merged development
Mar 30, 2017
ab4d5a3
tests passing
Mar 30, 2017
a266308
integration tests passing, again
Mar 31, 2017
e533b36
unit tests & pep8 passing
Apr 3, 2017
d3d2da5
merge contextless macros branch
Apr 3, 2017
cb2bba4
functional test tweaks
Apr 4, 2017
f1f39c1
remove extra raise
Apr 4, 2017
0b810c6
fix integration tests
Apr 4, 2017
4f663a0
allow global project macros to be referenced without a namespace
Apr 4, 2017
837bff4
handle compilation errors
Apr 4, 2017
0574958
add manifest file
drewbanin Apr 4, 2017
1063500
include global project files in dbt dist
drewbanin Apr 4, 2017
161d70c
Merge pull request #361 from fishtown-analytics/include-global-project
drewbanin Apr 4, 2017
8b59748
rename funcs to adapter
Apr 4, 2017
b7044d9
remove compile task
Apr 4, 2017
2921a00
don't throw compilr error for this/target in macro
drewbanin Apr 5, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
- Ignore commented-out schema tests ([#330](https://github.com/fishtown-analytics/dbt/pull/330), [#328](https://github.com/fishtown-analytics/dbt/issues/328))
- Fix run levels ([#343](https://github.com/fishtown-analytics/dbt/pull/343), [#340](https://github.com/fishtown-analytics/dbt/issues/340), [#338](https://github.com/fishtown-analytics/dbt/issues/338))
- Fix concurrency, open a unique transaction per model ([#345](https://github.com/fishtown-analytics/dbt/pull/345), [#336](https://github.com/fishtown-analytics/dbt/issues/336))
- Handle concurrent `DROP ... CASCADE`s in Redshift ([#349](https://github.com/fishtown-analytics/dbt/pull/349))
- Always release connections (use `try .. finally`) ([#354](https://github.com/fishtown-analytics/dbt/pull/354))

### Changes

Expand Down
33 changes: 32 additions & 1 deletion dbt/adapters/redshift.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import multiprocessing

from dbt.adapters.postgres import PostgresAdapter
from dbt.logger import GLOBAL_LOGGER as logger # noqa
from dbt.compat import basestring


drop_lock = multiprocessing.Lock()


class RedshiftAdapter(PostgresAdapter):
Expand Down Expand Up @@ -30,7 +36,7 @@ def sort_qualifier(cls, sort_type, sort):
.format(sort_type, valid_sort_types)
)

if type(sort) == str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch

if isinstance(sort, basestring):
sort_keys = [sort]
else:
sort_keys = sort
Expand All @@ -42,3 +48,28 @@ def sort_qualifier(cls, sort_type, sort):
return "{sort_type} sortkey({keys_csv})".format(
sort_type=sort_type, keys_csv=keys_csv
)

@classmethod
def drop(cls, profile, relation, relation_type, model_name=None):
global drop_lock

to_return = None

try:
drop_lock.acquire()

connection = cls.get_connection(profile, model_name)

cls.commit(connection)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmcarthur is there any problem with committing if there is no open transaction?

cls.begin(profile, connection.get('name'))

to_return = super(PostgresAdapter, cls).drop(
profile, relation, relation_type, model_name)

cls.commit(connection)
cls.begin(profile, connection.get('name'))

return to_return

finally:
drop_lock.release()
57 changes: 14 additions & 43 deletions dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dbt.project
import dbt.utils
import dbt.include
import dbt.wrapper

from dbt.model import Model
from dbt.utils import This, Var, is_enabled, get_materialization, NodeType, \
Expand Down Expand Up @@ -269,13 +270,14 @@ def get_compiler_context(self, linker, model, flat_graph):
context = self.project.context()
adapter = get_adapter(self.project.run_environment())

this_table = model.get('name')

# built-ins
context['ref'] = self.__ref(context, model, flat_graph)
context['config'] = self.__model_config(model, linker)
context['this'] = This(
context['env']['schema'],
(model.get('name') if dbt.flags.NON_DESTRUCTIVE
else '{}__dbt_tmp'.format(model.get('name'))),
this_table,
model.get('name')
)
context['var'] = Var(model, context=context)
Expand All @@ -291,33 +293,6 @@ def get_compiler_context(self, linker, model, flat_graph):

return context

def get_context(self, linker, model, models):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

# THIS IS STILL USED FOR WRAPPING, BUT SHOULD GO AWAY
# - Connor
runtime = RuntimeContext(model=model)

context = self.project.context()

# built-ins
context['ref'] = self.__ref(context, model, models)
context['config'] = self.__model_config(model, linker)
context['this'] = This(
context['env']['schema'], model.immediate_name, model.name
)
context['var'] = Var(model, context=context)
context['target'] = self.project.get_target()

# these get re-interpolated at runtime!
context['run_started_at'] = '{{ run_started_at }}'
context['invocation_id'] = '{{ invocation_id }}'

adapter = get_adapter(self.project.run_environment())
context['sql_now'] = adapter.date_function()

runtime.update_global(context)

return runtime

def compile_node(self, linker, node, flat_graph):
logger.debug("Compiling {}".format(node.get('unique_id')))

Expand Down Expand Up @@ -402,22 +377,18 @@ def compile_graph(self, linker, flat_graph):
# the SQL at the parser level.
pass

elif(is_type(injected_node, NodeType.Model) and
get_materialization(injected_node) == 'ephemeral'):
pass

else:
model = Model(
context = self.get_compiler_context(
linker, injected_node, injected_graph)
wrapped_stmt = dbt.wrapper.wrap(
injected_node,
self.project,
injected_node.get('root_path'),
injected_node.get('path'),
all_projects.get(injected_node.get('package_name')))

cfg = injected_node.get('config', {})
model._config = cfg

context = self.get_context(linker,
model,
injected_graph.get('nodes'))

wrapped_stmt = model.compile(
injected_node.get('injected_sql'), self.project, context)
context,
flat_graph)

injected_node['wrapped_sql'] = wrapped_stmt
wrapped_graph['nodes'][name] = injected_node
Expand Down
1 change: 1 addition & 0 deletions dbt/flags.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
STRICT_MODE = False
NON_DESTRUCTIVE = False
FULL_REFRESH = False
56 changes: 56 additions & 0 deletions dbt/include/global_project/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@


{# this shouldnt be duplicated - use the existing macro! #}
{% macro dbt__create_table_for_incremental(schema, identifier, dist, sort, sql, flags, funcs) -%}
create table {{ schema }}.{{ identifier }} {{ dist }} {{ sort }} as (
{{ sql }}
);
{%- endmacro %}

{% macro dbt__incremental_delete(schema, identifier, unique_key) -%}

delete
from "{{ schema }}"."{{ identifier }}"
where ({{ unique_key }}) in (
select ({{ unique_key }}) from "{{ identifier }}__dbt_incremental_tmp"
);

{%- endmacro %}

{% macro dbt__create_incremental(schema, model, identifier, dist, sort, sql, sql_where, funcs, flags, unique_key=None) -%}

{% if not funcs['already_exists'](schema, identifier) -%}

{{ dbt__create_table_for_incremental(schema, identifier, dist, sort, sql, flags, funcs) }}

{%- else -%}

create temporary table "{{ identifier }}__dbt_incremental_tmp" as (
with dbt_incr_sbq as (
{{ sql }}
)
select * from dbt_incr_sbq
where ({{ model.get('config', {}).get('sql_where', 'null') }})
or ({{ model.get('config', {}).get('sql_where', 'null') }}) is null
);

-- DBT_OPERATION { function: expand_column_types_if_needed, args: { temp_table: "{{ identifier }}__dbt_incremental_tmp", to_schema: "{{ schema }}", to_table: "{{ identifier }}"} }

{% set dest_columns = funcs['get_columns_in_table'](schema, identifier) %}
{% set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') %}

{% if model.get('config', {}).get('unique_key') is not none -%}

{{ dbt__incremental_delete(schema, identifier, unique_key) }}

{%- endif %}

insert into "{{ schema }}"."{{ identifier }}" ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from "{{ identifier }}__dbt_incremental_tmp"
);

{%- endif %}

{%- endmacro %}
24 changes: 24 additions & 0 deletions dbt/include/global_project/macros/materializations/table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

{% macro dbt__create_table(schema, identifier, dist, sort, sql, flags, funcs) -%}

{% if not flags.NON_DESTRUCTIVE or
not funcs['already_exists'](schema, identifier) -%}
create table {{ schema }}.{{ identifier }} {{ dist }} {{ sort }} as (
{{ sql }}
);
{%- else -%}
create temporary table {{ identifier }}__dbt_tmp {{ dist }} {{ sort }} as (
{{ sql }}
);

{% set dest_columns = funcs['get_columns_in_table'](schema, identifier) %}
{% set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') %}

insert into {{ schema }}.{{ identifier }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from "{{ identifier }}__dbt_tmp"
);
{%- endif %}

{%- endmacro %}
8 changes: 8 additions & 0 deletions dbt/include/global_project/macros/materializations/view.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

{% macro dbt__create_view(schema, identifier, sql) -%}

create view {{ schema }}.{{ identifier }} as (
{{ sql }}
);

{%- endmacro %}
20 changes: 20 additions & 0 deletions dbt/include/global_project/macros/materializations/wrapper.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

{% macro dbt__wrap(sql, pre_hooks, post_hooks) -%}

-- Compiled by DBT

{% for hook in pre_hooks %}

{{ hook }};

{% endfor %}

{{ sql }}

{% for hook in post_hooks -%}

{{ hook }};

{% endfor %}

{% endmacro %}
10 changes: 3 additions & 7 deletions dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def main(args=None):
handle(args)

except RuntimeError as e:
raise
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmcarthur we should remove this -- i just threw it in there for debugging

logger.info("Encountered an error:")
logger.info(str(e))
sys.exit(1)
Expand Down Expand Up @@ -184,13 +185,8 @@ def invoke_dbt(parsed):

return None

log_dir = proj.get('log-path', 'logs')

if hasattr(proj.args, 'non_destructive') and \
proj.args.non_destructive is True:
flags.NON_DESTRUCTIVE = True
else:
flags.NON_DESTRUCTIVE = False
flags.NON_DESTRUCTIVE = getattr(proj.args, 'non_destructive', False)
flags.FULL_REFRESH = getattr(proj.args, 'full_refresh', False)

logger.debug("running dbt with arguments %s", parsed)

Expand Down
47 changes: 1 addition & 46 deletions dbt/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,7 @@ def config(self):
return cfg

def is_full_refresh(self):
if hasattr(self.active_project, 'args') and \
hasattr(self.active_project.args, 'full_refresh'):
return self.active_project.args.full_refresh
else:
return False
return dbt.flags.FULL_REFRESH

def update_in_model_config(self, config):
config = config.copy()
Expand Down Expand Up @@ -345,47 +341,6 @@ def get_prologue_string(self):
blob = "\n".join("-- {}".format(s) for s in self.prologue)
return "-- Compiled by DBT\n{}".format(blob)

def sort_qualifier(self, model_config):
if 'sort' not in model_config:
return ''

if (self.is_view or self.is_ephemeral) and 'sort' in model_config:
return ''

sort_keys = model_config['sort']
sort_type = model_config.get('sort_type', 'compound')

if type(sort_type) != str:
compiler_error(
self,
"The provided sort_type '{}' is not valid!".format(sort_type)
)

sort_type = sort_type.strip().lower()

adapter = get_adapter(self.project.run_environment())
return adapter.sort_qualifier(sort_type, sort_keys)

def dist_qualifier(self, model_config):
if 'dist' not in model_config:
return ''

if (self.is_view or self.is_ephemeral) and 'dist' in model_config:
return ''

dist_key = model_config['dist']

if type(dist_key) != str:
compiler_error(
self,
"The provided distkey '{}' is not valid!".format(dist_key)
)

dist_key = dist_key.strip().lower()

adapter = get_adapter(self.project.run_environment())
return adapter.dist_qualifier(dist_key)

def build_path(self):
filename = "{}.sql".format(self.name)
path_parts = [self.build_dir] + self.fqn[:-1] + [filename]
Expand Down
29 changes: 16 additions & 13 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,23 +478,26 @@ def execute_node(self, node, existing):
adapter = get_adapter(profile)
connection = adapter.begin(profile, node.get('name'))

logger.debug("executing node %s", node.get('unique_id'))
try:
logger.debug("executing node %s", node.get('unique_id'))

if node.get('skip') is True:
return "SKIP"

if node.get('skip') is True:
return "SKIP"
node = self.inject_runtime_config(node)

node = self.inject_runtime_config(node)
if is_type(node, NodeType.Model):
result = execute_model(profile, node, existing)
elif is_type(node, NodeType.Test):
result = execute_test(profile, node)
elif is_type(node, NodeType.Archive):
result = execute_archive(
profile, node, self.node_context(node))

if is_type(node, NodeType.Model):
result = execute_model(profile, node, existing)
elif is_type(node, NodeType.Test):
result = execute_test(profile, node)
elif is_type(node, NodeType.Archive):
result = execute_archive(profile, node, self.node_context(node))
adapter.commit(connection)

adapter.commit(connection)
adapter.close(connection)
adapter.release_connection(profile, node.get('name'))
finally:
adapter.release_connection(profile, node.get('name'))

return result

Expand Down
Loading