Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CT 2483 duplicate depends on nodes #7455

Merged
merged 4 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230424-173404.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Use "add_node" to update depends_on.nodes
time: 2023-04-24T17:34:04.37479-04:00
custom:
Author: gshank
Issue: "7453"
31 changes: 0 additions & 31 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,25 +498,6 @@ class Disabled(Generic[D]):
T = TypeVar("T", bound=GraphMemberNode)


def _update_into(dest: MutableMapping[str, T], new_item: T):
"""Update dest to overwrite whatever is at dest[new_item.unique_id] with
new_itme. There must be an existing value to overwrite, and the two nodes
must have the same original file path.
"""
unique_id = new_item.unique_id
if unique_id not in dest:
raise dbt.exceptions.DbtRuntimeError(
f"got an update_{new_item.resource_type} call with an "
f"unrecognized {new_item.resource_type}: {new_item.unique_id}"
)
existing = dest[unique_id]
if new_item.original_file_path != existing.original_file_path:
raise dbt.exceptions.DbtRuntimeError(
f"cannot update a {new_item.resource_type} to have a new file path!"
)
dest[unique_id] = new_item


# This contains macro methods that are in both the Manifest
# and the MacroManifest
class MacroMethods:
Expand Down Expand Up @@ -672,18 +653,6 @@ def __post_deserialize__(cls, obj):
obj._lock = MP_CONTEXT.Lock()
return obj

def update_exposure(self, new_exposure: Exposure):
_update_into(self.exposures, new_exposure)

def update_metric(self, new_metric: Metric):
_update_into(self.metrics, new_metric)

def update_node(self, new_node: ManifestNode):
_update_into(self.nodes, new_node)

def update_source(self, new_source: SourceDefinition):
_update_into(self.sources, new_source)

def build_flat_graph(self):
"""This attribute is used in context.common by each node, so we want to
only build it once and avoid any concurrency issues around it.
Expand Down
24 changes: 7 additions & 17 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1228,8 +1228,7 @@ def _process_refs_for_exposure(manifest: Manifest, current_project: str, exposur

target_model_id = target_model.unique_id

exposure.depends_on.nodes.append(target_model_id)
manifest.update_exposure(exposure)
exposure.depends_on.add_node(target_model_id)


def _process_refs_for_metric(manifest: Manifest, current_project: str, metric: Metric):
Expand Down Expand Up @@ -1280,8 +1279,7 @@ def _process_refs_for_metric(manifest: Manifest, current_project: str, metric: M

target_model_id = target_model.unique_id

metric.depends_on.nodes.append(target_model_id)
manifest.update_metric(metric)
metric.depends_on.add_node(target_model_id)


def _process_metrics_for_node(
Expand Down Expand Up @@ -1330,7 +1328,7 @@ def _process_metrics_for_node(

target_metric_id = target_metric.unique_id

node.depends_on.nodes.append(target_metric_id)
node.depends_on.add_node(target_metric_id)


def _process_refs_for_node(manifest: Manifest, current_project: str, node: ManifestNode):
Expand Down Expand Up @@ -1387,12 +1385,7 @@ def _process_refs_for_node(manifest: Manifest, current_project: str, node: Manif

target_model_id = target_model.unique_id

node.depends_on.nodes.append(target_model_id)
# TODO: I think this is extraneous, node should already be the same
# as manifest.nodes[node.unique_id] (we're mutating node here, not
# making a new one)
# Q: could we stop doing this?
manifest.update_node(node)
node.depends_on.add_node(target_model_id)


def _process_sources_for_exposure(manifest: Manifest, current_project: str, exposure: Exposure):
Expand All @@ -1414,8 +1407,7 @@ def _process_sources_for_exposure(manifest: Manifest, current_project: str, expo
)
continue
target_source_id = target_source.unique_id
exposure.depends_on.nodes.append(target_source_id)
manifest.update_exposure(exposure)
exposure.depends_on.add_node(target_source_id)


def _process_sources_for_metric(manifest: Manifest, current_project: str, metric: Metric):
Expand All @@ -1437,8 +1429,7 @@ def _process_sources_for_metric(manifest: Manifest, current_project: str, metric
)
continue
target_source_id = target_source.unique_id
metric.depends_on.nodes.append(target_source_id)
manifest.update_metric(metric)
metric.depends_on.add_node(target_source_id)


def _process_sources_for_node(manifest: Manifest, current_project: str, node: ManifestNode):
Expand Down Expand Up @@ -1466,8 +1457,7 @@ def _process_sources_for_node(manifest: Manifest, current_project: str, node: Ma
)
continue
target_source_id = target_source.unique_id
node.depends_on.nodes.append(target_source_id)
manifest.update_node(node)
node.depends_on.add_node(target_source_id)


# This is called in task.rpc.sql_commands when a "dynamic" node is
Expand Down
7 changes: 1 addition & 6 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
NothingToDo,
)
from dbt.events.contextvars import log_contextvars
from dbt.contracts.graph.nodes import SourceDefinition, ResultNode
from dbt.contracts.graph.nodes import ResultNode
from dbt.contracts.results import NodeStatus, RunExecutionResult, RunningStatus
from dbt.contracts.state import PreviousState
from dbt.exceptions import (
Expand Down Expand Up @@ -295,11 +295,6 @@ def _handle_result(self, result):
if self.manifest is None:
raise DbtInternalError("manifest was None in _handle_result")

if isinstance(node, SourceDefinition):
self.manifest.update_source(node)
else:
self.manifest.update_node(node)

if result.status in self.MARK_DEPENDENT_ERRORS_STATUSES:
if is_ephemeral:
cause = result
Expand Down
9 changes: 0 additions & 9 deletions tests/functional/artifacts/expected_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1601,9 +1601,6 @@ def expected_versions_manifest(project):
"depends_on": {
"macros": [],
"nodes": [
"model.test.versioned_model.v2",
"model.test.versioned_model.v2",
"model.test.versioned_model.v2",
"model.test.versioned_model.v2",
"model.test.versioned_model.v1",
],
Expand Down Expand Up @@ -1862,9 +1859,6 @@ def expected_versions_manifest(project):
"model.test.versioned_model.v2": [
"exposure.test.notebook_exposure",
"model.test.ref_versioned_model",
"model.test.ref_versioned_model",
"model.test.ref_versioned_model",
"model.test.ref_versioned_model",
"test.test.unique_versioned_model_v2_first_name.998430d28e",
],
"model.test.ref_versioned_model": [],
Expand All @@ -1879,9 +1873,6 @@ def expected_versions_manifest(project):
"model.test.ref_versioned_model": [
"model.test.versioned_model.v1",
"model.test.versioned_model.v2",
"model.test.versioned_model.v2",
"model.test.versioned_model.v2",
"model.test.versioned_model.v2",
],
"exposure.test.notebook_exposure": ["model.test.versioned_model.v2"],
"test.test.unique_versioned_model_v1_first_name.6138195dec": [
Expand Down