Skip to content

Commit

Permalink
Merge pull request #1272 from fishtown-analytics/feature/source-fresh…
Browse files Browse the repository at this point in the history
…ness

Feature: source freshness (#1240)
  • Loading branch information
beckjake committed Feb 13, 2019
2 parents c5138eb + 0bd5999 commit 026c50d
Show file tree
Hide file tree
Showing 59 changed files with 2,180 additions and 1,136 deletions.
24 changes: 24 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ jobs:
- run:
name: Run tests
command: tox -e integration-postgres-py36
- store_artifacts:
path: ./logs/dbt.log
destination: postgres-py36
integration-snowflake-py36:
docker: &test_only
- image: fishtownjacob/test-container
Expand All @@ -37,20 +40,29 @@ jobs:
name: Run tests
command: tox -e integration-snowflake-py36
no_output_timeout: 1h
- store_artifacts:
path: ./logs/dbt.log
destination: snowflake-py36
integration-redshift-py36:
docker: *test_only
steps:
- checkout
- run:
name: Run tests
command: tox -e integration-redshift-py36
- store_artifacts:
path: ./logs/dbt.log
destination: redshift-py36
integration-bigquery-py36:
docker: *test_only
steps:
- checkout
- run:
name: Run tests
command: tox -e integration-bigquery-py36
- store_artifacts:
path: ./logs/dbt.log
destination: bigquery-py36
integration-postgres-py27:
docker: *test_and_postgres
steps:
Expand All @@ -59,6 +71,9 @@ jobs:
- run:
name: Run tests
command: tox -e integration-postgres-py27
- store_artifacts:
path: ./logs/dbt.log
destination: postgres-py27
integration-snowflake-py27:
docker: *test_only
steps:
Expand All @@ -67,20 +82,29 @@ jobs:
name: Run tests
command: tox -e integration-snowflake-py27
no_output_timeout: 1h
- store_artifacts:
path: ./logs/dbt.log
destination: snowflake-py27
integration-redshift-py27:
docker: *test_only
steps:
- checkout
- run:
name: Run tests
command: tox -e integration-redshift-py27
- store_artifacts:
path: ./logs/dbt.log
destination: redshift-py27
integration-bigquery-py27:
docker: *test_only
steps:
- checkout
- run:
name: Run tests
command: tox -e integration-bigquery-py27
- store_artifacts:
path: ./logs/dbt.log
destination: bigquery-py27

workflows:
version: 2
Expand Down
47 changes: 47 additions & 0 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time

import agate
import pytz
import six

import dbt.exceptions
Expand All @@ -25,6 +26,7 @@


GET_CATALOG_MACRO_NAME = 'get_catalog'
FRESHNESS_MACRO_NAME = 'collect_freshness'


def _expect_row_value(key, row):
Expand Down Expand Up @@ -67,6 +69,16 @@ def test(row):
return test


def _utc(dt):
"""If dt has a timezone, return a new datetime that's in UTC. Otherwise,
assume the datetime is already for UTC and add the timezone.
"""
if dt.tzinfo:
return dt.astimezone(pytz.UTC)
else:
return dt.replace(tzinfo=pytz.UTC)


@six.add_metaclass(AdapterMeta)
class BaseAdapter(object):
"""The BaseAdapter provides an abstract base class for adapters.
Expand Down Expand Up @@ -780,3 +792,38 @@ def get_catalog(self, manifest):
def cancel_open_connections(self):
"""Cancel all open connections."""
return self.connections.cancel_open()

def calculate_freshness(self, source, loaded_at_field, manifest=None,
connection_name=None):
"""Calculate the freshness of sources in dbt, and return it"""
# in the future `source` will be a Relation instead of a string
kwargs = {
'source': source,
'loaded_at_field': loaded_at_field
}

# run the macro
table = self.execute_macro(
FRESHNESS_MACRO_NAME,
kwargs=kwargs,
release=True,
manifest=manifest,
connection_name=connection_name
)
# now we have a 1-row table of the maximum `loaded_at_field` value and
# the current time according to the db.
if len(table) != 1 or len(table[0]) != 2:
dbt.exceptions.raise_compiler_error(
'Got an invalid result from "{}" macro: {}'.format(
FRESHNESS_MACRO_NAME, [tuple(r) for r in table]
),
node=node
)

max_loaded_at, snapshotted_at = map(_utc, table[0])
age = (snapshotted_at - max_loaded_at).total_seconds()
return {
'max_loaded_at': max_loaded_at,
'snapshotted_at': snapshotted_at,
'age': age,
}
14 changes: 14 additions & 0 deletions core/dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,20 @@ def quoted(self, identifier):
quote_char=self.quote_character,
identifier=identifier)

@classmethod
def create_from_source(cls, source, **kwargs):
return cls.create(
database=source.database,
schema=source.schema,
identifier=source.identifier,
quote_policy={
'database': True,
'schema': True,
'identifier': True,
},
**kwargs
)

@classmethod
def create_from_node(cls, config, node, table_name=None, quote_policy=None,
**kwargs):
Expand Down
4 changes: 4 additions & 0 deletions core/dbt/api/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from dbt.exceptions import JSONValidationException
from dbt.utils import deep_merge
from dbt.clients.system import write_json


class APIObject(Mapping):
Expand Down Expand Up @@ -61,6 +62,9 @@ def serialize(self):
"""
return copy.deepcopy(self._contents)

def write(self, path):
write_json(path, self.serialize())

@classmethod
def deserialize(cls, settings):
"""
Expand Down
43 changes: 43 additions & 0 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,46 @@ def compile(self, manifest):
print_compile_stats(stats)

return linker


def compile_manifest(config, manifest):
compiler = Compiler(config)
compiler.initialize()
return compiler.compile(manifest)


def compile_node(adapter, config, node, manifest, extra_context):
compiler = Compiler(config)
node = compiler.compile_node(node, manifest, extra_context)
node = _inject_runtime_config(adapter, node, extra_context)

if(node.injected_sql is not None and
not (dbt.utils.is_type(node, NodeType.Archive))):
logger.debug('Writing injected SQL for node "{}"'.format(
node.unique_id))

written_path = dbt.writer.write_node(
node,
config.target_path,
'compiled',
node.injected_sql)

node.build_path = written_path

return node


def _inject_runtime_config(adapter, node, extra_context):
wrapped_sql = node.wrapped_sql
context = _node_context(adapter, node)
context.update(extra_context)
sql = dbt.clients.jinja.get_rendered(wrapped_sql, context)
node.wrapped_sql = sql
return node


def _node_context(adapter, node):
return {
"run_started_at": dbt.tracking.active_user.run_started_at,
"invocation_id": dbt.tracking.active_user.invocation_id,
}
2 changes: 1 addition & 1 deletion core/dbt/context/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def do_source(source_name, table_name):
table_name)

model.sources.append([source_name, table_name])
return target_source.sql_table_name
return db_wrapper.Relation.create_from_source(target_source)

return do_source

Expand Down
29 changes: 10 additions & 19 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dbt.api import APIObject
from dbt.contracts.graph.unparsed import UNPARSED_NODE_CONTRACT
from dbt.contracts.graph.parsed import PARSED_NODE_CONTRACT, \
PARSED_MACRO_CONTRACT, PARSED_DOCUMENTATION_CONTRACT, ParsedNode, \
PARSED_MACRO_CONTRACT, PARSED_DOCUMENTATION_CONTRACT, \
PARSED_SOURCE_DEFINITION_CONTRACT
from dbt.contracts.graph.compiled import COMPILED_NODE_CONTRACT, CompiledNode
from dbt.exceptions import ValidationException
Expand Down Expand Up @@ -330,25 +330,22 @@ def _filter_subgraph(self, subgraph, predicate):

def _model_matches_schema_and_table(self, schema, table, model):
if model.resource_type == NodeType.Source:
return False
return (model.schema.lower() == schema.lower() and
model.identifier.lower() == table.lower())
return (model.schema.lower() == schema.lower() and
model.alias.lower() == table.lower())

def get_unique_id_for_schema_and_table(self, schema, table):
def get_unique_ids_for_schema_and_table(self, schema, table):
"""
Given a schema and table, find a matching model, and return
the unique_id for that model. If more than one matching
model is found, raise an exception.
Given a schema and table, find matching models, and return
their unique_ids. A schema and table may have more than one
match if the relation matches both a source and a seed, for instance.
"""
def predicate(model):
return self._model_matches_schema_and_table(schema, table, model)

matching = list(self._filter_subgraph(self.nodes, predicate))

if not matching:
return None

return matching[0].get('unique_id')
return [match.get('unique_id') for match in matching]

def add_nodes(self, new_nodes):
"""Add the given dict of new nodes to the manifest."""
Expand Down Expand Up @@ -404,17 +401,11 @@ def __getattr__(self, name):
type(self).__name__, name)
)

def parsed_nodes(self):
for node in self.nodes.values():
if node.resource_type == NodeType.Source:
continue
yield node

def get_used_schemas(self):
return frozenset({
(node.database, node.schema)
for node in self.parsed_nodes()
for node in self.nodes.values()
})

def get_used_databases(self):
return frozenset(node.database for node in self.parsed_nodes())
return frozenset(node.database for node in self.nodes.values())
Loading

0 comments on commit 026c50d

Please sign in to comment.