From 725ec25527236d8a3c43fa658d45553168c7f542 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 28 Dec 2023 09:55:09 +0100 Subject: [PATCH] - Add `TreeNode.transform_with_payload()`, `TreeNode.transform_down_with_payload()` and `TreeNode.transform_up_with_payload()` - Refactor `SortPushDown` and `PlanWithRequitements` using `TreeNode.transform_down_with_payload()` - Refactor `ExprOrdering` using `TreeNode.transform_up_with_payload()` --- datafusion/common/src/tree_node.rs | 72 +++++++ .../enforce_distribution.rs | 199 +++++++----------- .../src/physical_optimizer/enforce_sorting.rs | 63 +++++- .../src/physical_optimizer/sort_pushdown.rs | 148 +------------ .../src/equivalence/properties.rs | 64 +++--- .../physical-expr/src/sort_properties.rs | 73 +------ 6 files changed, 253 insertions(+), 366 deletions(-) diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 5da9636ffe18..8ec106df3e90 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -114,6 +114,42 @@ pub trait TreeNode: Sized { self.transform_up(op) } + /// Transforms the tree using `f_down` pre-preorder (top-down) and `f_up` post-order + /// (bottom-up) traversals. The `f_down` and `f_up` closures take payloads that they + /// propagate down and up during the transformation. + /// + /// The `f_down` closure takes `FD` type payload from its parent and returns `Vec` + /// type payload to propagate down to its children. One `FD` element is propagated + /// down to each child. + /// + /// The `f_up` closure takes `FU` type payload from its children collected into a + /// `Vec` and returns `FU` type payload to propagate up to its parent. + fn transform_with_payload( + self, + f_down: &mut FD, + payload_down: PD, + f_up: &mut FU, + ) -> Result<(Self, PU)> + where + FD: FnMut(Self, PD) -> Result<(Transformed, Vec)>, + FU: FnMut(Self, Vec) -> Result<(Transformed, PU)>, + { + let (new_node, new_payload_down) = f_down(self, payload_down)?; + let mut new_payload_down_iter = new_payload_down.into_iter(); + let mut payload_up = vec![]; + let node_with_new_children = new_node.into().map_children(|node| { + let (new_node, p) = node.transform_with_payload( + f_down, + new_payload_down_iter.next().unwrap(), + f_up, + )?; + payload_up.push(p); + Ok(new_node) + })?; + let (new_node, new_payload_up) = f_up(node_with_new_children, payload_up)?; + Ok((new_node.into(), new_payload_up)) + } + /// Convenience utils for writing optimizers rule: recursively apply the given 'op' to the node and all of its /// children(Preorder Traversal). /// When the `op` does not apply to a given node, it is left unchanged. @@ -136,6 +172,23 @@ pub trait TreeNode: Sized { after_op.map_children(|node| node.transform_down_mut(op)) } + /// Transforms the tree using `f` pre-preorder (top-down) traversal. The `f_down` + /// closure takes payloads that it propagates down during the transformation. + /// + /// The `f_down` closure takes `FD` type payload from its parent and returns `Vec` + /// type payload to propagate down to its children. One `FD` element is propagated + /// down to each child. + fn transform_down_with_payload(self, f: &mut F, payload: P) -> Result + where + F: FnMut(Self, P) -> Result<(Transformed, Vec

)>, + { + let (new_node, new_payload) = f(self, payload)?; + let mut new_payload_iter = new_payload.into_iter(); + new_node.into().map_children(|node| { + node.transform_down_with_payload(f, new_payload_iter.next().unwrap()) + }) + } + /// Convenience utils for writing optimizers rule: recursively apply the given 'op' first to all of its /// children and then itself(Postorder Traversal). /// When the `op` does not apply to a given node, it is left unchanged. @@ -162,6 +215,25 @@ pub trait TreeNode: Sized { Ok(new_node) } + /// Transforms the tree using `f_up` post-order traversal. The `f_up` closure takes + /// payloads that it propagates up during the transformation. + /// + /// The `f_up` closure takes `FU` type payload from its children collected into a + /// `Vec` and returns `FU` type payload to propagate up to its parent. + fn transform_up_with_payload(self, f: &mut F) -> Result<(Self, P)> + where + F: FnMut(Self, Vec

) -> Result<(Transformed, P)>, + { + let mut payload = vec![]; + let node_with_new_children = self.map_children(|node| { + let (new_node, p) = node.transform_up_with_payload(f)?; + payload.push(p); + Ok(new_node) + })?; + let (new_node, new_payload) = f(node_with_new_children, payload)?; + Ok((new_node.into(), new_payload)) + } + /// Transform the tree node using the given [TreeNodeRewriter] /// It performs a depth first walk of an node and its children. /// diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index d5a086227323..62dfd68e0377 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -198,10 +198,7 @@ impl PhysicalOptimizerRule for EnforceDistribution { let adjusted = if top_down_join_key_reordering { // Run a top-down process to adjust input key ordering recursively - let plan_requirements = PlanWithKeyRequirements::new(plan); - let adjusted = - plan_requirements.transform_down(&adjust_input_keys_ordering)?; - adjusted.plan + plan.transform_down_with_payload(&mut adjust_input_keys_ordering, vec![])? } else { // Run a bottom-up process plan.transform_up(&|plan| { @@ -269,11 +266,17 @@ impl PhysicalOptimizerRule for EnforceDistribution { /// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements /// 5) For other types of operators, by default, pushdown the parent requirements to children. /// +type RequiredKeyOrdering = Vec>; + fn adjust_input_keys_ordering( - mut requirements: PlanWithKeyRequirements, -) -> Result> { - let parent_required = requirements.required_key_ordering.clone(); - let plan_any = requirements.plan.as_any(); + plan: Arc, + required_key_ordering: RequiredKeyOrdering, +) -> Result<( + Transformed>, + Vec, +)> { + let parent_required = required_key_ordering.clone(); + let plan_any = plan.as_any(); if let Some(HashJoinExec { left, @@ -302,13 +305,15 @@ fn adjust_input_keys_ordering( .map(|e| Arc::new(e) as _) }; reorder_partitioned_join_keys( - requirements.plan.clone(), + plan.clone(), &parent_required, on, vec![], &join_constructor, ) - .map(Transformed::Yes) + .map(|(plan, request_key_ordering)| { + (Transformed::Yes(plan), request_key_ordering) + }) } PartitionMode::CollectLeft => { let new_right_request = match join_type { @@ -326,15 +331,14 @@ fn adjust_input_keys_ordering( }; // Push down requirements to the right side - requirements.children[1].required_key_ordering = - new_right_request.unwrap_or(vec![]); - Ok(Transformed::Yes(requirements)) + let request_key_ordering = + vec![vec![], new_right_request.unwrap_or_default()]; + Ok((Transformed::Yes(plan), request_key_ordering)) } PartitionMode::Auto => { // Can not satisfy, clear the current requirements and generate new empty requirements - Ok(Transformed::Yes(PlanWithKeyRequirements::new( - requirements.plan, - ))) + let request_key_ordering = vec![vec![]; plan.children().len()]; + Ok((Transformed::Yes(plan), request_key_ordering)) } } } else if let Some(CrossJoinExec { left, .. }) = @@ -342,9 +346,14 @@ fn adjust_input_keys_ordering( { let left_columns_len = left.schema().fields().len(); // Push down requirements to the right side - requirements.children[1].required_key_ordering = - shift_right_required(&parent_required, left_columns_len).unwrap_or_default(); - Ok(Transformed::Yes(requirements)) + Ok(( + Transformed::Yes(plan), + vec![ + vec![], + shift_right_required(&parent_required, left_columns_len) + .unwrap_or_default(), + ], + )) } else if let Some(SortMergeJoinExec { left, right, @@ -368,29 +377,33 @@ fn adjust_input_keys_ordering( .map(|e| Arc::new(e) as _) }; reorder_partitioned_join_keys( - requirements.plan.clone(), + plan.clone(), &parent_required, on, sort_options.clone(), &join_constructor, ) - .map(Transformed::Yes) + .map(|(plan, request_key_ordering)| { + (Transformed::Yes(plan), request_key_ordering) + }) } else if let Some(aggregate_exec) = plan_any.downcast_ref::() { if !parent_required.is_empty() { match aggregate_exec.mode() { - AggregateMode::FinalPartitioned => reorder_aggregate_keys( - requirements.plan.clone(), - &parent_required, - aggregate_exec, - ) - .map(Transformed::Yes), - _ => Ok(Transformed::Yes(PlanWithKeyRequirements::new( - requirements.plan, - ))), + AggregateMode::FinalPartitioned => { + reorder_aggregate_keys(plan.clone(), &parent_required, aggregate_exec) + .map(|(plan, request_key_ordering)| { + (Transformed::Yes(plan), request_key_ordering) + }) + } + _ => { + let request_key_ordering = vec![vec![]; plan.children().len()]; + Ok((Transformed::Yes(plan), request_key_ordering)) + } } } else { // Keep everything unchanged - Ok(Transformed::No(requirements)) + let request_key_ordering = vec![vec![]; plan.children().len()]; + Ok((Transformed::No(plan), request_key_ordering)) } } else if let Some(proj) = plan_any.downcast_ref::() { let expr = proj.expr(); @@ -399,27 +412,25 @@ fn adjust_input_keys_ordering( // Construct a mapping from new name to the the orginal Column let new_required = map_columns_before_projection(&parent_required, expr); if new_required.len() == parent_required.len() { - requirements.children[0].required_key_ordering = new_required; - Ok(Transformed::Yes(requirements)) + Ok((Transformed::Yes(plan), vec![new_required])) } else { // Can not satisfy, clear the current requirements and generate new empty requirements - Ok(Transformed::Yes(PlanWithKeyRequirements::new( - requirements.plan, - ))) + let request_key_ordering = vec![vec![]; plan.children().len()]; + Ok((Transformed::Yes(plan), request_key_ordering)) } } else if plan_any.downcast_ref::().is_some() || plan_any.downcast_ref::().is_some() || plan_any.downcast_ref::().is_some() { - Ok(Transformed::Yes(PlanWithKeyRequirements::new( - requirements.plan, - ))) + let request_key_ordering = vec![vec![]; plan.children().len()]; + Ok((Transformed::Yes(plan), request_key_ordering)) } else { // By default, push down the parent requirements to children - requirements.children.iter_mut().for_each(|child| { - child.required_key_ordering = parent_required.clone(); - }); - Ok(Transformed::Yes(requirements)) + let children_len = plan.children().len(); + Ok(( + Transformed::Yes(plan), + vec![parent_required.clone(); children_len], + )) } } @@ -429,7 +440,7 @@ fn reorder_partitioned_join_keys( on: &[(Column, Column)], sort_options: Vec, join_constructor: &F, -) -> Result +) -> Result<(Arc, Vec)> where F: Fn((Vec<(Column, Column)>, Vec)) -> Result>, { @@ -451,24 +462,19 @@ where for idx in 0..sort_options.len() { new_sort_options.push(sort_options[new_positions[idx]]) } - let mut requirement_tree = PlanWithKeyRequirements::new(join_constructor(( - new_join_on, - new_sort_options, - ))?); - requirement_tree.children[0].required_key_ordering = left_keys; - requirement_tree.children[1].required_key_ordering = right_keys; - Ok(requirement_tree) + + Ok(( + join_constructor((new_join_on, new_sort_options))?, + vec![left_keys, right_keys], + )) } else { - let mut requirement_tree = PlanWithKeyRequirements::new(join_plan); - requirement_tree.children[0].required_key_ordering = left_keys; - requirement_tree.children[1].required_key_ordering = right_keys; - Ok(requirement_tree) + Ok((join_plan, vec![left_keys, right_keys])) } } else { - let mut requirement_tree = PlanWithKeyRequirements::new(join_plan); - requirement_tree.children[0].required_key_ordering = join_key_pairs.left_keys; - requirement_tree.children[1].required_key_ordering = join_key_pairs.right_keys; - Ok(requirement_tree) + Ok(( + join_plan, + vec![join_key_pairs.left_keys, join_key_pairs.right_keys], + )) } } @@ -476,7 +482,7 @@ fn reorder_aggregate_keys( agg_plan: Arc, parent_required: &[Arc], agg_exec: &AggregateExec, -) -> Result { +) -> Result<(Arc, Vec)> { let output_columns = agg_exec .group_by() .expr() @@ -494,11 +500,15 @@ fn reorder_aggregate_keys( || !agg_exec.group_by().null_expr().is_empty() || physical_exprs_equal(&output_exprs, parent_required) { - Ok(PlanWithKeyRequirements::new(agg_plan)) + let request_key_ordering = vec![vec![]; agg_plan.children().len()]; + Ok((agg_plan, request_key_ordering)) } else { let new_positions = expected_expr_positions(&output_exprs, parent_required); match new_positions { - None => Ok(PlanWithKeyRequirements::new(agg_plan)), + None => { + let request_key_ordering = vec![vec![]; agg_plan.children().len()]; + Ok((agg_plan, request_key_ordering)) + } Some(positions) => { let new_partial_agg = if let Some(agg_exec) = agg_exec.input().as_any().downcast_ref::() @@ -570,11 +580,13 @@ fn reorder_aggregate_keys( .push((Arc::new(Column::new(name, idx)) as _, name.clone())) } // TODO merge adjacent Projections if there are - Ok(PlanWithKeyRequirements::new(Arc::new( - ProjectionExec::try_new(proj_exprs, new_final_agg)?, - ))) + let new_plan = + Arc::new(ProjectionExec::try_new(proj_exprs, new_final_agg)?); + let request_key_ordering = vec![vec![]; new_plan.children().len()]; + Ok((new_plan, request_key_ordering)) } else { - Ok(PlanWithKeyRequirements::new(agg_plan)) + let request_key_ordering = vec![vec![]; agg_plan.children().len()]; + Ok((agg_plan, request_key_ordering)) } } } @@ -1463,61 +1475,6 @@ struct JoinKeyPairs { right_keys: Vec>, } -#[derive(Debug, Clone)] -struct PlanWithKeyRequirements { - plan: Arc, - /// Parent required key ordering - required_key_ordering: Vec>, - children: Vec, -} - -impl PlanWithKeyRequirements { - fn new(plan: Arc) -> Self { - let children = plan.children(); - Self { - plan, - required_key_ordering: vec![], - children: children.into_iter().map(Self::new).collect(), - } - } -} - -impl TreeNode for PlanWithKeyRequirements { - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - for child in &self.children { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - - Ok(VisitRecursion::Continue) - } - - fn map_children(mut self, transform: F) -> Result - where - F: FnMut(Self) -> Result, - { - if !self.children.is_empty() { - self.children = self - .children - .into_iter() - .map(transform) - .collect::>()?; - self.plan = with_new_children_if_necessary( - self.plan, - self.children.iter().map(|c| c.plan.clone()).collect(), - )? - .into(); - } - Ok(self) - } -} - /// Since almost all of these tests explicitly use `ParquetExec` they only run with the parquet feature flag on #[cfg(feature = "parquet")] #[cfg(test)] diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 77d04a61c59e..0d6693593efa 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -41,7 +41,7 @@ use crate::error::Result; use crate::physical_optimizer::replace_with_order_preserving_variants::{ replace_with_order_preserving_variants, OrderPreservationContext, }; -use crate::physical_optimizer::sort_pushdown::{pushdown_sorts, SortPushDown}; +use crate::physical_optimizer::sort_pushdown::pushdown_requirement_to_children; use crate::physical_optimizer::utils::{ add_sort_above, is_coalesce_partitions, is_limit, is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window, @@ -309,10 +309,63 @@ impl PhysicalOptimizerRule for EnforceSorting { // Execute a top-down traversal to exploit sort push-down opportunities // missed by the bottom-up traversal: - let mut sort_pushdown = SortPushDown::new(updated_plan.plan); - sort_pushdown.assign_initial_requirements(); - let adjusted = sort_pushdown.transform_down(&pushdown_sorts)?; - Ok(adjusted.plan) + let plan = updated_plan.plan.transform_down_with_payload( + &mut |mut plan, required_ordering: Option>| { + let parent_required = required_ordering.as_deref().unwrap_or(&[]); + if let Some(sort_exec) = plan.as_any().downcast_ref::() { + if !plan + .equivalence_properties() + .ordering_satisfy_requirement(parent_required) + { + // If the current plan is a SortExec, modify it to satisfy parent requirements: + let mut new_plan = sort_exec.input().clone(); + add_sort_above(&mut new_plan, parent_required, sort_exec.fetch()); + plan = new_plan; + }; + let required_ordering = plan + .output_ordering() + .map(PhysicalSortRequirement::from_sort_exprs) + .unwrap_or_default(); + // Since new_plan is a SortExec, we can safely get the 0th index. + let child = plan.children().swap_remove(0); + if let Some(adjusted) = + pushdown_requirement_to_children(&child, &required_ordering)? + { + // Can push down requirements + Ok((Transformed::Yes(child), adjusted)) + } else { + // Can not push down requirements + let required_input_ordering = plan.required_input_ordering(); + Ok((Transformed::Yes(plan), required_input_ordering)) + } + } else { + // Executors other than SortExec + if plan + .equivalence_properties() + .ordering_satisfy_requirement(parent_required) + { + // Satisfies parent requirements, immediately return. + let required_input_ordering = plan.required_input_ordering(); + return Ok((Transformed::Yes(plan), required_input_ordering)); + } + // Can not satisfy the parent requirements, check whether the requirements can be pushed down: + if let Some(adjusted) = + pushdown_requirement_to_children(&plan, parent_required)? + { + Ok((Transformed::Yes(plan), adjusted)) + } else { + // Can not push down requirements, add new SortExec: + let mut new_plan = plan; + add_sort_above(&mut new_plan, parent_required, None); + let required_input_ordering = new_plan.required_input_ordering(); + // Can not push down requirements + Ok((Transformed::Yes(new_plan), required_input_ordering)) + } + } + }, + None, + )?; + Ok(plan) } fn name(&self) -> &str { diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index b0013863010a..c7792294de5f 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -18,17 +18,15 @@ use std::sync::Arc; use crate::physical_optimizer::utils::{ - add_sort_above, is_limit, is_sort_preserving_merge, is_union, is_window, + is_limit, is_sort_preserving_merge, is_union, is_window, }; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::joins::utils::calculate_join_output_ordering; use crate::physical_plan::joins::{HashJoinExec, SortMergeJoinExec}; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::repartition::RepartitionExec; -use crate::physical_plan::sorts::sort::SortExec; -use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; +use crate::physical_plan::ExecutionPlan; -use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::{plan_err, DataFusionError, JoinSide, Result}; use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::Column; @@ -36,147 +34,7 @@ use datafusion_physical_expr::{ LexRequirementRef, PhysicalSortExpr, PhysicalSortRequirement, }; -/// This is a "data class" we use within the [`EnforceSorting`] rule to push -/// down [`SortExec`] in the plan. In some cases, we can reduce the total -/// computational cost by pushing down `SortExec`s through some executors. -/// -/// [`EnforceSorting`]: crate::physical_optimizer::enforce_sorting::EnforceSorting -#[derive(Debug, Clone)] -pub(crate) struct SortPushDown { - /// Current plan - pub plan: Arc, - /// Parent required sort ordering - required_ordering: Option>, - children_nodes: Vec, -} - -impl SortPushDown { - /// Creates an empty tree with empty `required_ordering`'s. - pub fn new(plan: Arc) -> Self { - let children = plan.children(); - Self { - plan, - required_ordering: None, - children_nodes: children.into_iter().map(Self::new).collect(), - } - } - - /// Assigns the ordering requirement of the root node to the its children. - pub fn assign_initial_requirements(&mut self) { - let reqs = self.plan.required_input_ordering(); - for (child, requirement) in self.children_nodes.iter_mut().zip(reqs) { - child.required_ordering = requirement; - } - } -} - -impl TreeNode for SortPushDown { - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - for child in &self.children_nodes { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - - Ok(VisitRecursion::Continue) - } - fn map_children(mut self, transform: F) -> Result - where - F: FnMut(Self) -> Result, - { - if !self.children_nodes.is_empty() { - self.children_nodes = self - .children_nodes - .into_iter() - .map(transform) - .collect::>()?; - self.plan = with_new_children_if_necessary( - self.plan, - self.children_nodes.iter().map(|c| c.plan.clone()).collect(), - )? - .into(); - } - Ok(self) - } -} - -pub(crate) fn pushdown_sorts( - mut requirements: SortPushDown, -) -> Result> { - let plan = &requirements.plan; - let parent_required = requirements.required_ordering.as_deref().unwrap_or(&[]); - - if let Some(sort_exec) = plan.as_any().downcast_ref::() { - if !plan - .equivalence_properties() - .ordering_satisfy_requirement(parent_required) - { - // If the current plan is a SortExec, modify it to satisfy parent requirements: - let mut new_plan = sort_exec.input().clone(); - add_sort_above(&mut new_plan, parent_required, sort_exec.fetch()); - requirements.plan = new_plan; - }; - - let required_ordering = requirements - .plan - .output_ordering() - .map(PhysicalSortRequirement::from_sort_exprs) - .unwrap_or_default(); - // Since new_plan is a SortExec, we can safely get the 0th index. - let mut child = requirements.children_nodes.swap_remove(0); - if let Some(adjusted) = - pushdown_requirement_to_children(&child.plan, &required_ordering)? - { - for (c, o) in child.children_nodes.iter_mut().zip(adjusted) { - c.required_ordering = o; - } - // Can push down requirements - child.required_ordering = None; - Ok(Transformed::Yes(child)) - } else { - // Can not push down requirements - let mut empty_node = SortPushDown::new(requirements.plan); - empty_node.assign_initial_requirements(); - Ok(Transformed::Yes(empty_node)) - } - } else { - // Executors other than SortExec - if plan - .equivalence_properties() - .ordering_satisfy_requirement(parent_required) - { - // Satisfies parent requirements, immediately return. - let reqs = requirements.plan.required_input_ordering(); - for (child, order) in requirements.children_nodes.iter_mut().zip(reqs) { - child.required_ordering = order; - } - return Ok(Transformed::Yes(requirements)); - } - // Can not satisfy the parent requirements, check whether the requirements can be pushed down: - if let Some(adjusted) = pushdown_requirement_to_children(plan, parent_required)? { - for (c, o) in requirements.children_nodes.iter_mut().zip(adjusted) { - c.required_ordering = o; - } - requirements.required_ordering = None; - Ok(Transformed::Yes(requirements)) - } else { - // Can not push down requirements, add new SortExec: - let mut new_plan = requirements.plan; - add_sort_above(&mut new_plan, parent_required, None); - let mut new_empty = SortPushDown::new(new_plan); - new_empty.assign_initial_requirements(); - // Can not push down requirements - Ok(Transformed::Yes(new_empty)) - } - } -} - -fn pushdown_requirement_to_children( +pub fn pushdown_requirement_to_children( plan: &Arc, parent_required: LexRequirementRef, ) -> Result>>>> { diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 31c1cf61193a..74bd34a6e4f5 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -29,7 +29,7 @@ use crate::equivalence::{ }; use crate::expressions::Literal; -use crate::sort_properties::{ExprOrdering, SortProperties}; +use crate::sort_properties::SortProperties; use crate::{ physical_exprs_contains, LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, @@ -313,8 +313,7 @@ impl EquivalenceProperties { /// /// Returns `true` if the specified ordering is satisfied, `false` otherwise. fn ordering_satisfy_single(&self, req: &PhysicalSortRequirement) -> bool { - let expr_ordering = self.get_expr_ordering(req.expr.clone()); - let ExprOrdering { expr, state, .. } = expr_ordering; + let (expr, state) = self.get_expr_ordering(req.expr.clone()); match state { SortProperties::Ordered(options) => { let sort_expr = PhysicalSortExpr { expr, options }; @@ -708,7 +707,7 @@ impl EquivalenceProperties { let ordered_exprs = search_indices .iter() .flat_map(|&idx| { - let ExprOrdering { expr, state, .. } = + let (expr, state) = eq_properties.get_expr_ordering(exprs[idx].clone()); if let SortProperties::Ordered(options) = state { Some((PhysicalSortExpr { expr, options }, idx)) @@ -775,15 +774,29 @@ impl EquivalenceProperties { /// /// Returns an `ExprOrdering` object containing the ordering information for /// the given expression. - pub fn get_expr_ordering(&self, expr: Arc) -> ExprOrdering { - ExprOrdering::new(expr.clone()) - .transform_up(&|expr| Ok(update_ordering(expr, self))) - // Guaranteed to always return `Ok`. - .unwrap() + pub fn get_expr_ordering( + &self, + expr: Arc, + ) -> (Arc, SortProperties) { + // The transform is designed to aid in the determination of ordering (represented + // by [`SortProperties`]) for a given [`PhysicalExpr`]. When analyzing the orderings + // of a [`PhysicalExpr`], the process begins by assigning the ordering of its leaf nodes. + // By propagating these leaf node orderings upwards in the expression tree, the overall + // ordering of the entire [`PhysicalExpr`] can be derived. + // + // This struct holds the necessary state information for each expression in the [`PhysicalExpr`]. + // It encapsulates the orderings (`state`) associated with the expression (`expr`), and + // orderings of the children expressions (`children_states`). The `state` of a parent + // expression is determined based on the states of its children expressions. + expr.transform_up_with_payload(&mut |expr, children_states| { + Ok(update_ordering(expr, children_states, self)) + }) + // Guaranteed to always return `Ok`. + .unwrap() } } -/// Calculates the [`SortProperties`] of a given [`ExprOrdering`] node. +/// Calculates the [`SortProperties`] of a given [`PhysicalExpr`] node. /// The node can either be a leaf node, or an intermediate node: /// - If it is a leaf node, we directly find the order of the node by looking /// at the given sort expression and equivalence properties if it is a `Column` @@ -796,28 +809,31 @@ impl EquivalenceProperties { /// sort expression emerges at that node immediately, discarding the recursive /// result coming from its children. fn update_ordering( - mut node: ExprOrdering, + expr: Arc, + children_states: Vec, eq_properties: &EquivalenceProperties, -) -> Transformed { +) -> (Transformed>, SortProperties) { // We have a Column, which is one of the two possible leaf node types: - let normalized_expr = eq_properties.eq_group.normalize_expr(node.expr.clone()); + let normalized_expr = eq_properties.eq_group.normalize_expr(expr.clone()); + + let state; if eq_properties.is_expr_constant(&normalized_expr) { - node.state = SortProperties::Singleton; + state = SortProperties::Singleton; } else if let Some(options) = eq_properties .normalized_oeq_class() .get_options(&normalized_expr) { - node.state = SortProperties::Ordered(options); - } else if !node.expr.children().is_empty() { + state = SortProperties::Ordered(options); + } else if !expr.children().is_empty() { // We have an intermediate (non-leaf) node, account for its children: - node.state = node.expr.get_ordering(&node.children_state()); - } else if node.expr.as_any().is::() { + state = expr.get_ordering(&children_states); + } else if expr.as_any().is::() { // We have a Literal, which is the other possible leaf node type: - node.state = node.expr.get_ordering(&[]); + state = expr.get_ordering(&[]); } else { - return Transformed::No(node); + return (Transformed::No(expr), Default::default()); } - Transformed::Yes(node) + (Transformed::Yes(expr), state) } /// This function determines whether the provided expression is constant @@ -1680,12 +1696,12 @@ mod tests { .iter() .flat_map(|ordering| ordering.first().cloned()) .collect::>(); - let expr_ordering = eq_properties.get_expr_ordering(expr.clone()); + let (_, state) = eq_properties.get_expr_ordering(expr.clone()); let err_msg = format!( "expr:{:?}, expected: {:?}, actual: {:?}, leading_orderings: {leading_orderings:?}", - expr, expected, expr_ordering.state + expr, expected, state ); - assert_eq!(expr_ordering.state, expected, "{}", err_msg); + assert_eq!(state, expected, "{}", err_msg); } Ok(()) diff --git a/datafusion/physical-expr/src/sort_properties.rs b/datafusion/physical-expr/src/sort_properties.rs index 91238e5b04b4..e086fdc3cf57 100644 --- a/datafusion/physical-expr/src/sort_properties.rs +++ b/datafusion/physical-expr/src/sort_properties.rs @@ -15,15 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::{ops::Neg, sync::Arc}; +use std::ops::Neg; use arrow_schema::SortOptions; -use crate::PhysicalExpr; -use datafusion_common::tree_node::{TreeNode, VisitRecursion}; -use datafusion_common::Result; - -/// To propagate [`SortOptions`] across the [`PhysicalExpr`], it is insufficient +/// To propagate [`SortOptions`] across the [`crate::PhysicalExpr`], it is insufficient /// to simply use `Option`: There must be a differentiation between /// unordered columns and literal values, since literals may not break the ordering /// when they are used as a child of some binary expression when the other child has @@ -136,68 +132,3 @@ impl Neg for SortProperties { } } } - -/// The `ExprOrdering` struct is designed to aid in the determination of ordering (represented -/// by [`SortProperties`]) for a given [`PhysicalExpr`]. When analyzing the orderings -/// of a [`PhysicalExpr`], the process begins by assigning the ordering of its leaf nodes. -/// By propagating these leaf node orderings upwards in the expression tree, the overall -/// ordering of the entire [`PhysicalExpr`] can be derived. -/// -/// This struct holds the necessary state information for each expression in the [`PhysicalExpr`]. -/// It encapsulates the orderings (`state`) associated with the expression (`expr`), and -/// orderings of the children expressions (`children_states`). The [`ExprOrdering`] of a parent -/// expression is determined based on the [`ExprOrdering`] states of its children expressions. -#[derive(Debug)] -pub struct ExprOrdering { - pub expr: Arc, - pub state: SortProperties, - pub children: Vec, -} - -impl ExprOrdering { - /// Creates a new [`ExprOrdering`] with [`SortProperties::Unordered`] states - /// for `expr` and its children. - pub fn new(expr: Arc) -> Self { - let children = expr.children(); - Self { - expr, - state: Default::default(), - children: children.into_iter().map(Self::new).collect(), - } - } - - /// Get a reference to each child state. - pub fn children_state(&self) -> Vec { - self.children.iter().map(|c| c.state).collect() - } -} - -impl TreeNode for ExprOrdering { - fn apply_children(&self, op: &mut F) -> Result - where - F: FnMut(&Self) -> Result, - { - for child in &self.children { - match op(child)? { - VisitRecursion::Continue => {} - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } - } - Ok(VisitRecursion::Continue) - } - - fn map_children(mut self, transform: F) -> Result - where - F: FnMut(Self) -> Result, - { - if !self.children.is_empty() { - self.children = self - .children - .into_iter() - .map(transform) - .collect::>()?; - } - Ok(self) - } -}