Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/materialize incremental #90

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions dbt/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def contents(self):
return fh.read().strip()

def get_config_keys(self):
return ['enabled', 'materialized', 'dist', 'sort']
return ['enabled', 'materialized', 'dist', 'sort', 'sql_field']

def compile(self):
raise RuntimeError("Not implemented!")
Expand Down Expand Up @@ -84,6 +84,8 @@ def __init__(self, project, model_dir, rel_filepath):
super(Model, self).__init__(project, model_dir, rel_filepath)

def sort_qualifier(self, model_config):
if 'sort' not in model_config:
return ''
sort_keys = model_config['sort']
if type(sort_keys) == str:
sort_keys = [sort_keys]
Expand All @@ -93,6 +95,9 @@ def sort_qualifier(self, model_config):
return "sortkey ({})".format(', '.join(formatted_sort_keys))

def dist_qualifier(self, model_config):
if 'dist' not in model_config:
return ''

dist_key = model_config['dist']

if type(dist_key) != str:
Expand All @@ -108,22 +113,36 @@ def build_path(self, create_template):

def compile(self, rendered_query, project, create_template):
model_config = self.get_config(project)
table_or_view = 'table' if model_config['materialized'] else 'view'

valid_materializations = ['view', 'table', 'incremental']
materialization = model_config['materialized']
if materialization not in valid_materializations:
raise RuntimeError("Invalid materialize option given: '{}'. Must be one of {}".format(materialization, valid_materializations))

ctx = project.context()
schema = ctx['env'].get('schema', 'public')

is_table = table_or_view == 'table'
dist_qualifier = self.dist_qualifier(model_config) if 'dist' in model_config and is_table else ''
sort_qualifier = self.sort_qualifier(model_config) if 'sort' in model_config and is_table else ''
# these are empty strings if configs aren't provided
dist_qualifier = self.dist_qualifier(model_config)
sort_qualifier = self.sort_qualifier(model_config)

if materialization in ('table', 'view'):
identifier = self.tmp_name()
sql_field = None
else:
identifier = self.name
if 'sql_field' not in model_config:
raise RuntimeError("sql_field not specified in model materialized as incremental: {}".format(self))
sql_field = model_config['sql_field']

opts = {
"table_or_view": table_or_view,
"materialization": materialization,
"schema": schema,
"identifier": self.tmp_name(),
"identifier": identifier,
"query": rendered_query,
"dist_qualifier": dist_qualifier,
"sort_qualifier": sort_qualifier
"sort_qualifier": sort_qualifier,
"sql_field": sql_field
}

return create_template.wrap(opts)
Expand Down
14 changes: 10 additions & 4 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ def execute_model(self, target, model, tmp_drop_type, final_drop_type):
if final_drop_type is not None:
self.drop(target, model, model.name, final_drop_type)

self.rename(target, model)
model_config = model.get_config(model.project)
if model_config['materialized'] != 'incremental':
self.rename(target, model)

def execute_models(self, linker, models, limit_to=None):
target = self.get_target()
Expand All @@ -205,9 +207,13 @@ def execute_models(self, linker, models, limit_to=None):
def wrap_fqn(target, models, existing, fqn):
model = self.get_model_by_fqn(models, fqn)

# False, 'view', or 'table'
tmp_drop_type = existing.get(model.tmp_name(), None)
final_drop_type = existing.get(model.name, None)
model_config = model.get_config(model.project)
if model_config.get('materialized') == 'incremental':
tmp_drop_type = None
final_drop_type = None
else:
tmp_drop_type = existing.get(model.tmp_name(), None)
final_drop_type = existing.get(model.name, None)
return {"model" : model, "target": target, "tmp_drop_type": tmp_drop_type, 'final_drop_type': final_drop_type}

# we can only pass one arg to the self.execute_model method below. Pass a dict w/ all the data we need
Expand Down
2 changes: 1 addition & 1 deletion dbt/seeder.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def do_seed(self, schema, cursor, drop_existing):
self.insert_into_table(cursor, schema, table_name, virtual_table)
except psycopg2.ProgrammingError as e:
print('Encountered an error while inserting into table "{}"."{}"'.format(schema, table_name))
print('Check for formatting errors in {}'.format(csv_path))
print('Check for formatting errors in {}'.format(csv.filepath))
print('Try --drop-existing to delete and recreate the table instead')
print(str(e))

Expand Down
30 changes: 28 additions & 2 deletions dbt/templates.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,44 @@

class BaseCreateTemplate(object):
template = """
create {table_or_view} "{schema}"."{identifier}" {dist_qualifier} {sort_qualifier} as (
create {materialization} "{schema}"."{identifier}" {dist_qualifier} {sort_qualifier} as (
{query}
);"""

incremental_template = """
create temporary table "{identifier}__dbt_incremental_tmp" {dist_qualifier} {sort_qualifier} as (
SELECT * FROM (
{query}
) as tmp LIMIT 0
);

create table if not exists "{schema}"."{identifier}" (like "{identifier}__dbt_incremental_tmp");

insert into "{schema}"."{identifier}" (
with dbt_inc_sbq as (
select max({sql_field}) as __dbt_max from "{schema}"."{identifier}"
), dbt_raw_sbq as (
{query}
)
select dbt_raw_sbq.* from dbt_raw_sbq
join dbt_inc_sbq on {sql_field} > dbt_inc_sbq.__dbt_max or dbt_inc_sbq.__dbt_max is null
order by {sql_field}
);
"""

label = "build"

@classmethod
def model_name(cls, base_name):
return base_name

def wrap(self, opts):
return self.template.format(**opts)
if opts['materialization'] in ('table', 'view'):
return self.template.format(**opts)
elif opts['materialization'] == 'incremental':
return self.incremental_template.format(**opts)
else:
raise RuntimeError("Invalid materialization parameter ({})".format(opts['materialization']))

class TestCreateTemplate(object):
template = """
Expand Down