Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

postgres: gracefully handle materialized views (#1698) #1833

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions core/dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,17 +389,21 @@ def is_view(self) -> bool:
return self.type == RelationType.View

@classproperty
def Table(self) -> str:
def Table(cls) -> str:
return str(RelationType.Table)

@classproperty
def CTE(self) -> str:
def CTE(cls) -> str:
return str(RelationType.CTE)

@classproperty
def View(self) -> str:
def View(cls) -> str:
return str(RelationType.View)

@classproperty
def External(self) -> str:
def External(cls) -> str:
return str(RelationType.External)

@classproperty
def RelationType(cls) -> Type[RelationType]:
return RelationType
31 changes: 23 additions & 8 deletions core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ def _add_link(self, referenced_key, dependent_key):
:raises InternalError: If either entry does not exist.
"""
referenced = self.relations.get(referenced_key)
if referenced is None:
return
if referenced is None:
dbt.exceptions.raise_cache_inconsistent(
'in add_link, referenced link key {} not in cache!'
Expand All @@ -268,8 +270,8 @@ def _add_link(self, referenced_key, dependent_key):
referenced.add_reference(dependent)

def add_link(self, referenced, dependent):
"""Add a link between two relations to the database. Both the old and
new entries must already exist in the database.
"""Add a link between two relations to the database. If either relation
does not exist, it will be added as an "external" relation.

The dependent model refers _to_ the referenced model. So, given
arguments of (jake_test, bar, jake_test, foo):
Expand All @@ -281,23 +283,36 @@ def add_link(self, referenced, dependent):
:param BaseRelation dependent: The dependent model.
:raises InternalError: If either entry does not exist.
"""
referenced = _make_key(referenced)
if (referenced.database, referenced.schema) not in self:
ref_key = _make_key(referenced)
if (ref_key.database, ref_key.schema) not in self:
# if we have not cached the referenced schema at all, we must be
# referring to a table outside our control. There's no need to make
# a link - we will never drop the referenced relation during a run.
logger.debug(
'{dep!s} references {ref!s} but {ref.database}.{ref.schema} '
'is not in the cache, skipping assumed external relation'
.format(dep=dependent, ref=referenced)
.format(dep=dependent, ref=ref_key)
)
return
dependent = _make_key(dependent)
if ref_key not in self.relations:
# Insert a dummy "external" relation.
referenced = referenced.replace(
type=referenced.RelationType.External
)
self.add(referenced)

dep_key = _make_key(dependent)
if dep_key not in self.relations:
# Insert a dummy "external" relation.
dependent = dependent.replace(
type=referenced.RelationType.External
)
self.add(dependent)
logger.debug(
'adding link, {!s} references {!s}'.format(dependent, referenced)
'adding link, {!s} references {!s}'.format(dep_key, ref_key)
)
with self.lock:
self._add_link(referenced, dependent)
self._add_link(ref_key, dep_key)

def add(self, relation):
"""Add the relation inner to the cache, under the schema schema and
Expand Down
14 changes: 7 additions & 7 deletions plugins/postgres/dbt/adapters/postgres/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,22 @@ def _link_cached_database_relations(self, schemas):
database = self.config.credentials.database
table = self.execute_macro(GET_RELATIONS_MACRO_NAME)

for (refed_schema, refed_name, dep_schema, dep_name) in table:
referenced = self.Relation.create(
database=database,
schema=refed_schema,
identifier=refed_name
)
for (dep_schema, dep_name, refed_schema, refed_name) in table:
dependent = self.Relation.create(
database=database,
schema=dep_schema,
identifier=dep_name
)
referenced = self.Relation.create(
database=database,
schema=refed_schema,
identifier=refed_name
)

# don't record in cache if this relation isn't in a relevant
# schema
if refed_schema.lower() in schemas:
self.cache.add_link(dependent, referenced)
self.cache.add_link(referenced, dependent)

def _get_cache_schemas(self, manifest, exec_only=False):
# postgres/redshift only allow one database (the main one)
Expand Down
36 changes: 26 additions & 10 deletions test/integration/001_simple_copy_test/test_simple_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ def models(self):

class TestSimpleCopy(BaseTestSimpleCopy):

@property
def project_config(self):
return {"data-paths": [self.dir("seed-initial")]}

@use_profile("postgres")
def test__postgres__simple_copy(self):
self.use_default_project({"data-paths": [self.dir("seed-initial")]})

results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
Expand All @@ -36,10 +38,30 @@ def test__postgres__simple_copy(self):

self.assertManyTablesEqual(["seed", "view_model", "incremental", "materialized", "get_and_ref"])

@use_profile('postgres')
def test__postgres__simple_copy_with_materialized_views(self):
self.run_sql('''
create table {schema}.unrelated_table (id int)
'''.format(schema=self.unique_schema())
)
self.run_sql('''
create materialized view {schema}.unrelated_materialized_view as (
select * from {schema}.unrelated_table
)
'''.format(schema=self.unique_schema()))
self.run_sql('''
create view {schema}.unrelated_view as (
select * from {schema}.unrelated_materialized_view
)
'''.format(schema=self.unique_schema()))

results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
self.assertEqual(len(results), 7)

@use_profile("postgres")
def test__postgres__dbt_doesnt_run_empty_models(self):
self.use_default_project({"data-paths": [self.dir("seed-initial")]})

results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
Expand All @@ -66,8 +88,6 @@ def test__presto__simple_copy(self):

@use_profile("snowflake")
def test__snowflake__simple_copy(self):
self.use_default_project({"data-paths": [self.dir("seed-initial")]})

results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
Expand All @@ -92,7 +112,6 @@ def test__snowflake__simple_copy(self):
@use_profile("snowflake")
def test__snowflake__simple_copy__quoting_off(self):
self.use_default_project({
"data-paths": [self.dir("seed-initial")],
"quoting": {"identifier": False},
})

Expand Down Expand Up @@ -124,7 +143,6 @@ def test__snowflake__simple_copy__quoting_off(self):
@use_profile("snowflake")
def test__snowflake__seed__quoting_switch(self):
self.use_default_project({
"data-paths": [self.dir("seed-initial")],
"quoting": {"identifier": False},
})

Expand All @@ -145,8 +163,6 @@ def test__snowflake__seed__quoting_switch(self):

@use_profile("bigquery")
def test__bigquery__simple_copy(self):
self.use_default_project({"data-paths": [self.dir("seed-initial")]})

results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ def project_config(self):
}
}

def setUp(self):
super().setUp()
# self.use_default_config()
self.run_sql_file("seed.sql")

@use_profile('postgres')
def test__postgres__simple_reference(self):
self.use_default_project()
self.run_sql_file("seed.sql")

results = self.run_dbt()
# ephemeral_copy doesn't show up in results
Expand Down Expand Up @@ -59,8 +62,6 @@ def test__postgres__simple_reference(self):

@use_profile('snowflake')
def test__snowflake__simple_reference(self):
self.use_default_project()
self.run_sql_file("seed.sql")

results = self.run_dbt()
self.assertEqual(len(results), 8)
Expand All @@ -83,8 +84,6 @@ def test__snowflake__simple_reference(self):

@use_profile('postgres')
def test__postgres__simple_reference_with_models(self):
self.use_default_project()
self.run_sql_file("seed.sql")

# Run materialized_copy, ephemeral_copy, and their dependents
# ephemeral_copy should not actually be materialized b/c it is ephemeral
Expand All @@ -101,8 +100,6 @@ def test__postgres__simple_reference_with_models(self):

@use_profile('postgres')
def test__postgres__simple_reference_with_models_and_children(self):
self.use_default_project()
self.run_sql_file("seed.sql")

# Run materialized_copy, ephemeral_copy, and their dependents
# ephemeral_copy should not actually be materialized b/c it is ephemeral
Expand Down Expand Up @@ -139,8 +136,6 @@ def test__postgres__simple_reference_with_models_and_children(self):

@use_profile('snowflake')
def test__snowflake__simple_reference_with_models(self):
self.use_default_project()
self.run_sql_file("seed.sql")

# Run materialized_copy & ephemeral_copy
# ephemeral_copy should not actually be materialized b/c it is ephemeral
Expand All @@ -157,8 +152,6 @@ def test__snowflake__simple_reference_with_models(self):

@use_profile('snowflake')
def test__snowflake__simple_reference_with_models_and_children(self):
self.use_default_project()
self.run_sql_file("seed.sql")

# Run materialized_copy, ephemeral_copy, and their dependents
# ephemeral_copy should not actually be materialized b/c it is ephemeral
Expand Down
22 changes: 14 additions & 8 deletions test/unit/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,22 @@ def setUp(self):
self.cache.add(make_relation('dbt', 'schema_2', 'bar'))

def test_no_src(self):
# src does not exist (but similar names do)
with self.assertRaises(dbt.exceptions.InternalException):
self.cache.add_link(make_relation('dbt', 'schema', 'bar'),
make_relation('dbt', 'schema', 'foo'))
self.assert_relations_exist('dbt', 'schema', 'foo')
self.assert_relations_do_not_exist('dbt', 'schema', 'bar')

self.cache.add_link(make_relation('dbt', 'schema', 'bar'),
make_relation('dbt', 'schema', 'foo'))

self.assert_relations_exist('dbt', 'schema', 'foo', 'bar')

def test_no_dst(self):
# dst does not exist (but similar names do)
with self.assertRaises(dbt.exceptions.InternalException):
self.cache.add_link(make_relation('dbt', 'schema', 'foo'),
make_relation('dbt', 'schema', 'bar'))
self.assert_relations_exist('dbt', 'schema', 'foo')
self.assert_relations_do_not_exist('dbt', 'schema', 'bar')

self.cache.add_link(make_relation('dbt', 'schema', 'foo'),
make_relation('dbt', 'schema', 'bar'))

self.assert_relations_exist('dbt', 'schema', 'foo', 'bar')


class TestRename(TestCache):
Expand Down