Skip to content

Commit

Permalink
Merge pull request #1254 from fishtown-analytics/feature/source-tables
Browse files Browse the repository at this point in the history
Add 'sources' to dbt (#814)
  • Loading branch information
beckjake committed Jan 28, 2019
2 parents b6d1e15 + 7b23a1b commit f6402d3
Show file tree
Hide file tree
Showing 65 changed files with 2,349 additions and 1,431 deletions.
9 changes: 7 additions & 2 deletions core/dbt/clients/jinja.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,13 @@ def _compile(self, source, filename):
)
# encode, though I don't think this matters
filename = jinja2._compat.encode_filename(filename)
# put ourselves in the cache using the 'lazycache' method
linecache.cache[filename] = (lambda: source,)
# put ourselves in the cache
linecache.cache[filename] = (
len(source),
None,
[line+'\n' for line in source.splitlines()],
filename
)

return super(MacroFuzzEnvironment, self)._compile(source, filename)

Expand Down
15 changes: 15 additions & 0 deletions core/dbt/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ def to_string(s):
return str(s)


def to_native_string(s):
if WHICH_PYTHON == 2:
if isinstance(s, unicode):
return str(s)
elif isinstance(s, basestring):
return s
else:
return str(s)
else:
if isinstance(s, basestring):
return s
else:
return str(s)


def write_file(path, s):
if WHICH_PYTHON == 2:
with codecs.open(path, 'w', encoding='utf-8') as f:
Expand Down
22 changes: 20 additions & 2 deletions core/dbt/config/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ def render_value(self, value, keypath=None):
# if it wasn't read as a string, ignore it
if not isinstance(value, compat.basestring):
return value

return get_rendered(value, self.context)
# force the result of rendering into this python version's native
# string type
return compat.to_native_string(get_rendered(value, self.context))

def _render_profile_data(self, value, keypath):
result = self.render_value(value)
Expand All @@ -73,6 +74,14 @@ def _render_profile_data(self, value, keypath):
pass
return result

def _render_schema_source_data(self, value, keypath):
# things to not render:
# - descriptions
if len(keypath) > 0 and keypath[-1] == 'description':
return value

return self.render_value(value)

def render_project(self, as_parsed):
"""Render the parsed data, returning a new dict (or whatever was read).
"""
Expand All @@ -93,3 +102,12 @@ def render_profile_data(self, as_parsed):
'Cycle detected: Profile input has a reference to itself',
project=as_parsed
)

def render_schema_source(self, as_parsed):
try:
return deep_map(self._render_schema_source_data, as_parsed)
except RecursionException:
raise DbtProfileError(
'Cycle detected: schema.yml input has a reference to itself',
project=as_parsed
)
1 change: 1 addition & 0 deletions core/dbt/context/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ def generate_base(model, model_dict, config, manifest, source_config,
"schema": config.credentials.schema,
"sql": None,
"sql_now": adapter.date_function(),
"source": provider.source(db_wrapper, model, config, manifest),
"fromjson": fromjson,
"tojson": tojson,
"target": target,
Expand Down
8 changes: 8 additions & 0 deletions core/dbt/context/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ def do_docs(*args):
return do_docs


def source(db_wrapper, model, config, manifest):
def do_source(source_name, table_name):
model.sources.append([source_name, table_name])
return ''

return do_source


class Config(object):
def __init__(self, model, source_config):
self.model = model
Expand Down
24 changes: 24 additions & 0 deletions core/dbt/context/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,30 @@ def do_ref(*args):
return do_ref


def source(db_wrapper, model, config, manifest):
current_project = config.project_name

def do_source(source_name, table_name):
target_source = ParserUtils.resolve_source(
manifest,
source_name,
table_name,
current_project,
model.get('package_name')
)

if target_source is None:
dbt.exceptions.source_target_not_found(
model,
source_name,
table_name)

model.sources.append([source_name, table_name])
return target_source.sql_table_name

return do_source


class Config:
def __init__(self, model, source_config=None):
self.model = model
Expand Down
4 changes: 4 additions & 0 deletions core/dbt/contracts/graph/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
}
)


COMPILED_NODES_CONTRACT = {
'type': 'object',
'additionalProperties': False,
Expand All @@ -87,8 +88,10 @@
},
}


COMPILED_MACRO_CONTRACT = PARSED_MACRO_CONTRACT


COMPILED_MACROS_CONTRACT = {
'type': 'object',
'additionalProperties': False,
Expand All @@ -100,6 +103,7 @@
},
}


COMPILED_GRAPH_CONTRACT = {
'type': 'object',
'additionalProperties': False,
Expand Down
35 changes: 29 additions & 6 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
from dbt.api import APIObject
from dbt.contracts.graph.unparsed import UNPARSED_NODE_CONTRACT
from dbt.contracts.graph.parsed import PARSED_NODE_CONTRACT, \
PARSED_MACRO_CONTRACT, PARSED_DOCUMENTATION_CONTRACT, ParsedNode
PARSED_MACRO_CONTRACT, PARSED_DOCUMENTATION_CONTRACT, ParsedNode, \
PARSED_SOURCE_DEFINITION_CONTRACT
from dbt.contracts.graph.compiled import COMPILED_NODE_CONTRACT, CompiledNode
from dbt.exceptions import ValidationException
from dbt.node_types import NodeType
from dbt.logger import GLOBAL_LOGGER as logger
from dbt import tracking
import dbt.utils

# We allow either parsed or compiled nodes, as some 'compile()' calls in the
# runner actually just return the original parsed node they were given.
# We allow either parsed or compiled nodes, or parsed sources, as some
# 'compile()' calls in the runner actually just return the original parsed
# node they were given.
COMPILE_RESULT_NODE_CONTRACT = {
'anyOf': [PARSED_NODE_CONTRACT, COMPILED_NODE_CONTRACT]
'anyOf': [
PARSED_NODE_CONTRACT,
COMPILED_NODE_CONTRACT,
PARSED_SOURCE_DEFINITION_CONTRACT,
]
}


Expand Down Expand Up @@ -268,6 +274,13 @@ def find_refable_by_name(self, name, package):
"""
return self._find_by_name(name, package, 'nodes', NodeType.refable())

def find_source_by_name(self, source_name, table_name, package):
"""Find any valid target for "source()" in the graph by its name and
package name, or None for any package.
"""
name = '{}.{}'.format(source_name, table_name)
return self._find_by_name(name, package, 'nodes', [NodeType.Source])

def get_materialization_macro(self, materialization_name,
adapter_type=None):
macro_name = dbt.utils.get_materialization_macro_name(
Expand All @@ -293,6 +306,8 @@ def get_materialization_macro(self, materialization_name,
def get_resource_fqns(self):
resource_fqns = {}
for unique_id, node in self.nodes.items():
if node.resource_type == NodeType.Source:
continue # sources have no FQNs and can't be configured
resource_type_plural = node.resource_type + 's'
if resource_type_plural not in resource_fqns:
resource_fqns[resource_type_plural] = set()
Expand All @@ -314,6 +329,8 @@ def _filter_subgraph(self, subgraph, predicate):
return to_return

def _model_matches_schema_and_table(self, schema, table, model):
if model.resource_type == NodeType.Source:
return False
return (model.schema.lower() == schema.lower() and
model.alias.lower() == table.lower())

Expand Down Expand Up @@ -387,11 +404,17 @@ def __getattr__(self, name):
type(self).__name__, name)
)

def parsed_nodes(self):
for node in self.nodes.values():
if node.resource_type == NodeType.Source:
continue
yield node

def get_used_schemas(self):
return frozenset({
(node.database, node.schema)
for node in self.nodes.values()
for node in self.parsed_nodes()
})

def get_used_databases(self):
return frozenset(node.database for node in self.nodes.values())
return frozenset(node.database for node in self.parsed_nodes())
Loading

0 comments on commit f6402d3

Please sign in to comment.