Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smart Source Freshness Runs #4256

Merged
merged 126 commits into from
Apr 12, 2022
Merged

Smart Source Freshness Runs #4256

merged 126 commits into from
Apr 12, 2022

Conversation

sungchun12
Copy link
Contributor

@sungchun12 sungchun12 commented Nov 9, 2021

resolves #4050

Description

When a user wants to run dbt models based on freshest data possible, they can do so in an elegant way: dbt build --select source_status:fresher+ --defer --state <previous state path>

The ultimate goal is to save precious time and money spent reprocessing stale data. Just because the extract and load process is finished doesn't mean it provided fresher data.

Example Use Case

dbt Cloud Job A: runs source freshness every 30 minutes. It will serve as the previous state sources.json.
Command Steps:

  • dbt source freshness
  • dbt build

dbt Cloud Job B: runs every 35 minutes and defers to Job A above. If the max_loaded_at date for the current state source freshness check is greater than the max_loaded_at for the previous state, this command will run the downstream models in scope. The job should only have to look within the ./target folder of Job A. Using the build command tests the sources for quality, so if those checks fail, downstream nodes will not run if you use fail fast.
Command Steps:

  • dbt source freshness
  • dbt build --select source_status:fresher+ --defer --state <previous state path>

image

Considerations:

  • Will dbt Cloud have to add new functionality to store previous state given that dbt command steps are stateless by default? It shouldn't. Since 4721, dbt Cloud will include sources.json (if available) in the artifacts it copies during deferral. We'll avoid the issue we experienced with the result: selector + run_results.json, because the relevant artifact in this case is sources.json — which doesn't get overridden by subsequent job steps, namely docs generate

Checklist

  • I have signed the CLA
  • I have run this code in development and it appears to resolve the stated issue
  • This PR includes tests, or tests are not required/relevant for this PR
  • I have updated the CHANGELOG.md and added information about my change

…into feature/smart-source-freshness-runs""

This reverts commit 7112516.
@jtcohen6
Copy link
Contributor

@sungchun12 @anaisvaillant Very cool, very quick work!

I took a quick look, and I noticed a difference between the implementation here and the idea described in #4050, though please tell me if I'm missing something.

The proposal in #4050 is to compare max_loaded_at between <state_path>/sources.json (previous run) and <target_path>/sources.json, to figure out if the "bookmark" for that source has moved. Two distinctions:

  • We need to read in two different sources.json files. I think that might look like registering both PreviousState.sources (using PreviousState.path), and also something like CurrentState.sources (using target_path). The latter isn't something that exists just yet; we might need to think a bit about where it ought to be defined and pull in.
  • I think we want the source's max_loaded_at, rather than its status. It could be fresh one or both times, without having moved the bookmark.

Once we have a way to register both, and perform the comparison between their max_loaded_at, I envision something like (pseudo-code):

class SourceBookmarkSelectorMethod(SelectorMethod):
    def search(
        self, included_nodes: Set[UniqueId], selector: str
    ) -> Iterator[UniqueId]:
        if self.previous_state is None or self.previous_state.sources is None
        or self.current_state is None or self.current_state.sources is None:
            raise InternalException(
                'Comparison sources missing in {self.previous_state.path} or {self.current_state.path}
            )
        # select any "current" source (just freshness-checked) that has a greater max_loaded_at
        # than the same source (previously freshness-checked), or if the source did not previously exist
        previous_sources = self.previous_state.sources
        current_sources = self.current_state.sources
        matches = set(
            current_source.unique_id for current_source in current_sources
            if (
                current_source.unique_id not in previous_sources or
                current_source.max_loaded_at > previous_sources[current_source.unique_id].max_loaded_at
            )
        )
        for node, real_node in self.all_nodes(included_nodes):
            if node in matches:
                yield node

@sungchun12
Copy link
Contributor Author

sungchun12 commented Nov 13, 2021

@jtcohen6 You raise a solid point that a source_status:pass+ without a max_loaded_at bookmark quickly becomes a wasteful run of children nodes if it's run every 5 minutes. I'll figure out where to inject CurrentState.sources.

Options on selector syntax and mechanics: @jtcohen6, Do you agree with my recommendation?

  • dbt run --select source_refresh:pass+(edit: shorter, cleaner selector syntax) This provides explicit intuition for the developer upfront that source status matters, and it'll only run children nodes if the max loaded at date has a date greater than the previous source freshness date. I still see use cases where warn and error statuses are used to run children nodes rather than max_loaded_at alone. However, it's a long name for a selector flag. Sung's Recommendation

  • dbt run --select source_status:pass+. This implies BOTH the source status AND the max loaded at date are explicit considerations before children nodes are run. Still the same flag. However, a developer will have to look at documentation to understand that both mechanisms are at play.

  • Two separate selectors that include source_status:pass+ AND source_status_max_loaded_at: pass+. This way, the developer can choose their own mechanism to run children nodes on status criteria OR status and max loaded at criteria.

  • I want to avoid using sources:fresh+ because it requires the developer to research on their own what mechanisms are used to run children nodes.

@jtcohen6
Copy link
Contributor

@sungchun12 I think I agree with your first recommendation. I want to hear a bit more about:

This provides explicit intuition for the developer upfront that source status matters... I still see use cases where warn and error statuses are used to run children nodes rather than max_loaded_at alone.

While source freshness statuses (pass, warn, error) matter tremendously for team SLAs and downstream metadata (e.g. exposure status reporting), I don't see how I'd want to use them to inform selection criteria. From my point of view, I just want the comparison between old + new max_loaded_at values. Does source X have new data since the last time I ran my models? If so, I want to select models downstream of source X to re-run; if not, I don't.

I feel that holds true regardless of whether the old max_loaded_at was past the warn threshold, the new max_loaded_at is past the error threshold, or both are safely within my freshness expectations and pass. In all cases, I just want to know whether the bookmark has moved.

@sungchun12
Copy link
Contributor Author

@jtcohen6 Thanks for the thorough reply! I also updated the selector flag to be shorter and coherent source_refresh:pass+. I'll provide examples below of how this feature will be used in the wild in response to your point on max_loaded_at criteria.

I agree that previous state pass, warn, error thresholds are irrelevant for source refresh jobs. I still recommend pass, warn, error thresholds are relevant for current state.

Open Questions

  • Do you agree with this recommendation?
  • When does this feature need to be merged to make it to the first release of v1.0.0?

Previous State dbt job

$ dbt source freshness # this job only checks data freshness

Current State dbt job

$ dbt source freshness # run this again to compare `max_loaded_at` to the previous state
$ dbt build -s source_refresh:pass+ source_refresh:warn+ --state my/previous/run/artifacts # refresh children nodes of sources that have `pass` or `warn` AND have fresher `max_loaded_at` dates ONLY in the current state dbt job

Today, a dbt developer has an intuition that children nodes of sources should run based on pass, warn, error. For the majority, pass, warn are the driving criteria to run children nodes. If this feature for source refreshes removes that criteria and only considers max_loaded_at, it takes away that driving criteria. Including the pass, warn criteria as concurrent selectors gives the developer more choices to mix and match use cases.

I don't see a compelling use case where a source refresh run should do the below, assuming all sources have error.

Previous State dbt job

$ dbt source freshness # this job only checks data freshness

Current State dbt job

$ dbt source freshness # run this again to compare `max_loaded_at` to the previous state. All sources have fresher `max_loaded_at` dates, but all are `error`
$ dbt build -s sources:fresh+ --state my/previous/run/artifacts # refresh children nodes of sources that have fresher `max_loaded_at` dates even if they are all `error`

Developers are expecting source refresh runs to be fresh AND meeting thresholds(ex: pass, warn) not only fresh.

I anticipate developers getting confused on why children nodes are being run if all sources have freshness errors. I also imagine lots of downstream tests failing because data isn't meeting the source pass criteria. However, if you have something like the below, this gives clarity to the logs and troubleshooting process.

$ dbt source freshness # run this again to compare `max_loaded_at` to the previous state. All sources have fresher `max_loaded_at` dates, but all are `error`
$ dbt build -s source_refresh:error+ --state my/previous/run/artifacts # refresh children nodes of sources that have fresher `max_loaded_at` dates explicitly knowing sources are not passing freshness

@sungchun12 sungchun12 marked this pull request as ready for review April 11, 2022 20:18
@sungchun12 sungchun12 requested a review from a team April 11, 2022 20:18
@sungchun12 sungchun12 requested review from a team as code owners April 11, 2022 20:18
Copy link
Contributor

@jtcohen6 jtcohen6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the hard work here @sungchun12 @matt-winkler @anaisvaillant!

The test coverage looks solid—thanks for all the extra work to adapt to the latest framework. We may want some more annotation and light refactoring, but I'm appreciative of the work that you all have already put into this, with the desire to ship this first experimental version in v1.1.

Let's ship it :)

models in scope. Example command: `dbt build --select source_status:fresher+` '
time: 2022-03-28T13:47:43.750709-05:00
custom:
Author: sungchun12, matt-winkler, anaisvaillant
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have support for multiple authors here yet, just added a note for it here: #4906 (comment)

That's ok, we can patch it manually during changelog generation + release

@@ -116,7 +117,9 @@ def __init__(self, args, config):

def set_previous_state(self):
if self.args.state is not None:
self.previous_state = PreviousState(self.args.state)
self.previous_state = PreviousState(
path=self.args.state, current_path=Path(self.config.target_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -787,7 +787,7 @@ def test_select_metric(manifest):
@pytest.fixture
def previous_state(manifest):
writable = copy.deepcopy(manifest).writable_manifest()
state = PreviousState(Path('/path/does/not/exist'))
state = PreviousState(path=Path('/path/does/not/exist'),current_path=Path('/path/does/not/exist'))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +20 to +23
# TODO: We may create utility classes to handle reusable fixtures.
def copy_to_previous_state():
shutil.copyfile("target/manifest.json", "previous_state/manifest.json")
shutil.copyfile("target/run_results.json", "previous_state/run_results.json")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if Windows will be happy about this. We do have a copy_file utility here, but it looks like it's doing pretty much the same thing.

Comment on lines +26 to +48
# put these here for now to get tests working
class AnyStringWith:
def __init__(self, contains=None):
self.contains = contains

def __eq__(self, other):
if not isinstance(other, str):
return False

if self.contains is None:
return True

return self.contains in other

def __repr__(self):
return "AnyStringWith<{!r}>".format(self.contains)


class AnyFloat:
"""Any float. Use this in assertEqual() calls to assert that it is a float."""

def __eq__(self, other):
return isinstance(other, float)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were copied from #4935. Maybe they eventually want to move into dbt/core/tests/util.py? (cc @gshank)

"source": project.adapter.quote("source"),
"quoted_columns": quoted_columns,
}
raw_sql = """INSERT INTO {schema}.{source}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for future reader: This probably isn't going to fly on other databases, but that's okay, the functionality being tested here / added in this PR is node selection, not adapter-specific

@jtcohen6 jtcohen6 merged commit 401ebc2 into main Apr 12, 2022
@jtcohen6 jtcohen6 deleted the feature/smart-source-freshness-runs branch April 12, 2022 13:08
VersusFacit pushed a commit that referenced this pull request Apr 14, 2022
* first draft

* working selector code

* remove debug print logs

* copy test template

* add todo

* smarter depends on graph searching notes

* add excluded source children nodes

* remove prints and clean up logger

* opinionated fresh node selection

* better if handling

* include logs with meaningul info

* add concurrent selectors note

* cleaner logging

* Revert "Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs"

This reverts commit 7fee4d4, reversing
changes made to 17c47ff.

* tidy up logs

* remove comments

* handle criterion that does not match nodes

* use a blank set instead

* Revert "Revert "Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs""

This reverts commit 7112516.

* make compatible with rc and new logger

* new log format

* new selector flag name

* clarify that status needs to be correct

* compare current and previous state

* correct import

* add current state

* remove print

* add todo

* fix error conditions

* clearer refresh language

* don't run wasteful logs

* remove for now

* cleaner syntax

* turn generator into set

* remove print

* add fresh selector

* data bookmarks matter only

* remove exclusion logic for status

* keep it DRY

* remove unused import

* dynamic project root

* dynamic cwd

* add TODO

* simple profiles_dir import

* add default target path

* headless path utils

* draft work

* add previous sources artifact read

* make PreviousState aware of t-2 sources

* make SourceFreshSelectorMethod aware of t-2 sources

* add archive_path() for t-2 sources to freshness.py

* clean up merged branches

* add to changelog

* rename file

* remove files

* remove archive path logic

* add in currentstate and previousstate defaults

* working version of source fresher

* syntax source_fresher: works

* fix quoting

* working version of target_path default

* None default to sources_current

* updated source selection semantics

* remove todo

* move to test_sources folder

* copy over baseline source freshness tests

* clean up

* remove test file

* update state with version checks

* fix flake tests

* add changelog

* fix name

* add base test template

* delegate tests

* add basic test to ensure nothing runs

* add another basic test

* fix test with copy state

* run error test

* run warn test

* run pass test

* error handling for runtime error in source freshness

* error handling for runtime error in source freshness

* add back fresher selector condition

* top level selector condition

* add runtime error test

* testing source fresher test selection methods

* fix formatting issues

* fix broken tests

* remove old comments

* fix regressions in other tests

* add Anais test cases

* result selector test case

Co-authored-by: Matt Winkler <matt.winkler@fishtownanalytics.com>
agoblet pushed a commit to BigDataRepublic/dbt-core that referenced this pull request May 20, 2022
* first draft

* working selector code

* remove debug print logs

* copy test template

* add todo

* smarter depends on graph searching notes

* add excluded source children nodes

* remove prints and clean up logger

* opinionated fresh node selection

* better if handling

* include logs with meaningul info

* add concurrent selectors note

* cleaner logging

* Revert "Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs"

This reverts commit 7fee4d4, reversing
changes made to 17c47ff.

* tidy up logs

* remove comments

* handle criterion that does not match nodes

* use a blank set instead

* Revert "Revert "Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs""

This reverts commit 7112516.

* make compatible with rc and new logger

* new log format

* new selector flag name

* clarify that status needs to be correct

* compare current and previous state

* correct import

* add current state

* remove print

* add todo

* fix error conditions

* clearer refresh language

* don't run wasteful logs

* remove for now

* cleaner syntax

* turn generator into set

* remove print

* add fresh selector

* data bookmarks matter only

* remove exclusion logic for status

* keep it DRY

* remove unused import

* dynamic project root

* dynamic cwd

* add TODO

* simple profiles_dir import

* add default target path

* headless path utils

* draft work

* add previous sources artifact read

* make PreviousState aware of t-2 sources

* make SourceFreshSelectorMethod aware of t-2 sources

* add archive_path() for t-2 sources to freshness.py

* clean up merged branches

* add to changelog

* rename file

* remove files

* remove archive path logic

* add in currentstate and previousstate defaults

* working version of source fresher

* syntax source_fresher: works

* fix quoting

* working version of target_path default

* None default to sources_current

* updated source selection semantics

* remove todo

* move to test_sources folder

* copy over baseline source freshness tests

* clean up

* remove test file

* update state with version checks

* fix flake tests

* add changelog

* fix name

* add base test template

* delegate tests

* add basic test to ensure nothing runs

* add another basic test

* fix test with copy state

* run error test

* run warn test

* run pass test

* error handling for runtime error in source freshness

* error handling for runtime error in source freshness

* add back fresher selector condition

* top level selector condition

* add runtime error test

* testing source fresher test selection methods

* fix formatting issues

* fix broken tests

* remove old comments

* fix regressions in other tests

* add Anais test cases

* result selector test case

Co-authored-by: Matt Winkler <matt.winkler@fishtownanalytics.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Selection method based on source freshness: new max_loaded_at, new data
4 participants