diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index ff052b5f040c..14715ede500a 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -53,14 +53,15 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::windows::{ get_best_fitting_window, BoundedWindowAggExec, WindowAggExec, }; -use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan}; +use crate::physical_plan::{ + with_new_children_if_necessary, Distribution, ExecutionPlan, InputOrderMode, +}; use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::{plan_err, DataFusionError}; use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement}; use datafusion_physical_plan::repartition::RepartitionExec; -use datafusion_physical_plan::windows::PartitionSearchMode; use itertools::izip; /// This rule inspects [`SortExec`]'s in the given physical plan and removes the @@ -611,7 +612,7 @@ fn analyze_window_sort_removal( window_expr.to_vec(), window_child, partitionby_exprs.to_vec(), - PartitionSearchMode::Sorted, + InputOrderMode::Sorted, )?) as _ } else { Arc::new(WindowAggExec::try_new( diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index cc62cda41266..37a76eff1ee2 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -35,7 +35,7 @@ use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::UnionExec; use crate::physical_plan::windows::create_window_expr; -use crate::physical_plan::{ExecutionPlan, Partitioning}; +use crate::physical_plan::{ExecutionPlan, InputOrderMode, Partitioning}; use crate::prelude::{CsvReadOptions, SessionContext}; use arrow_schema::{Schema, SchemaRef, SortOptions}; @@ -44,7 +44,6 @@ use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction}; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; -use datafusion_physical_plan::windows::PartitionSearchMode; use async_trait::async_trait; @@ -240,7 +239,7 @@ pub fn bounded_window_exec( .unwrap()], input.clone(), vec![], - PartitionSearchMode::Sorted, + InputOrderMode::Sorted, ) .unwrap(), ) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 47d071d533e3..8ef433173edd 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -63,12 +63,10 @@ use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::union::UnionExec; use crate::physical_plan::unnest::UnnestExec; use crate::physical_plan::values::ValuesExec; -use crate::physical_plan::windows::{ - BoundedWindowAggExec, PartitionSearchMode, WindowAggExec, -}; +use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{ - aggregates, displayable, udaf, windows, AggregateExpr, ExecutionPlan, Partitioning, - PhysicalExpr, WindowExpr, + aggregates, displayable, udaf, windows, AggregateExpr, ExecutionPlan, InputOrderMode, + Partitioning, PhysicalExpr, WindowExpr, }; use arrow::compute::SortOptions; @@ -761,7 +759,7 @@ impl DefaultPhysicalPlanner { window_expr, input_exec, physical_partition_keys, - PartitionSearchMode::Sorted, + InputOrderMode::Sorted, )?) } else { Arc::new(WindowAggExec::try_new( diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index af96063ffb5f..44ff71d02392 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -25,9 +25,9 @@ use arrow::util::pretty::pretty_format_batches; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::windows::{ - create_window_expr, BoundedWindowAggExec, PartitionSearchMode, WindowAggExec, + create_window_expr, BoundedWindowAggExec, WindowAggExec, }; -use datafusion::physical_plan::{collect, ExecutionPlan}; +use datafusion::physical_plan::{collect, ExecutionPlan, InputOrderMode}; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::type_coercion::aggregates::coerce_types; @@ -43,9 +43,7 @@ use hashbrown::HashMap; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; -use datafusion_physical_plan::windows::PartitionSearchMode::{ - Linear, PartiallySorted, Sorted, -}; +use datafusion_physical_plan::InputOrderMode::{Linear, PartiallySorted, Sorted}; #[tokio::test(flavor = "multi_thread", worker_threads = 16)] async fn window_bounded_window_random_comparison() -> Result<()> { @@ -385,9 +383,9 @@ async fn run_window_test( random_seed: u64, partition_by_columns: Vec<&str>, orderby_columns: Vec<&str>, - search_mode: PartitionSearchMode, + search_mode: InputOrderMode, ) -> Result<()> { - let is_linear = !matches!(search_mode, PartitionSearchMode::Sorted); + let is_linear = !matches!(search_mode, InputOrderMode::Sorted); let mut rng = StdRng::seed_from_u64(random_seed); let schema = input1[0].schema(); let session_config = SessionConfig::new().with_batch_size(50); diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index d594335af44f..2f69ed061ce1 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -27,11 +27,9 @@ use crate::aggregates::{ }; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; -use crate::windows::{ - get_ordered_partition_by_indices, get_window_mode, PartitionSearchMode, -}; +use crate::windows::{get_ordered_partition_by_indices, get_window_mode}; use crate::{ - DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, Partitioning, SendableRecordBatchStream, Statistics, }; @@ -304,7 +302,9 @@ pub struct AggregateExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, required_input_ordering: Option, - partition_search_mode: PartitionSearchMode, + /// Describes how the input is ordered relative to the group by columns + input_order_mode: InputOrderMode, + /// Describe how the output is ordered output_ordering: Option, } @@ -409,15 +409,15 @@ fn get_aggregate_search_mode( aggr_expr: &mut [Arc], order_by_expr: &mut [Option], ordering_req: &mut Vec, -) -> PartitionSearchMode { +) -> InputOrderMode { let groupby_exprs = group_by .expr .iter() .map(|(item, _)| item.clone()) .collect::>(); - let mut partition_search_mode = PartitionSearchMode::Linear; + let mut input_order_mode = InputOrderMode::Linear; if !group_by.is_single() || groupby_exprs.is_empty() { - return partition_search_mode; + return input_order_mode; } if let Some((should_reverse, mode)) = @@ -439,9 +439,9 @@ fn get_aggregate_search_mode( ); *ordering_req = reverse_order_bys(ordering_req); } - partition_search_mode = mode; + input_order_mode = mode; } - partition_search_mode + input_order_mode } /// Check whether group by expression contains all of the expression inside `requirement` @@ -515,7 +515,7 @@ impl AggregateExec { &input.equivalence_properties(), )?; let mut ordering_req = requirement.unwrap_or(vec![]); - let partition_search_mode = get_aggregate_search_mode( + let input_order_mode = get_aggregate_search_mode( &group_by, &input, &mut aggr_expr, @@ -567,7 +567,7 @@ impl AggregateExec { metrics: ExecutionPlanMetricsSet::new(), required_input_ordering, limit: None, - partition_search_mode, + input_order_mode, output_ordering, }) } @@ -767,8 +767,8 @@ impl DisplayAs for AggregateExec { write!(f, ", lim=[{limit}]")?; } - if self.partition_search_mode != PartitionSearchMode::Linear { - write!(f, ", ordering_mode={:?}", self.partition_search_mode)?; + if self.input_order_mode != InputOrderMode::Linear { + write!(f, ", ordering_mode={:?}", self.input_order_mode)?; } } } @@ -819,7 +819,7 @@ impl ExecutionPlan for AggregateExec { /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { if children[0] { - if self.partition_search_mode == PartitionSearchMode::Linear { + if self.input_order_mode == InputOrderMode::Linear { // Cannot run without breaking pipeline. plan_err!( "Aggregate Error: `GROUP BY` clauses with columns without ordering and GROUPING SETS are not supported for unbounded inputs." diff --git a/datafusion/physical-plan/src/aggregates/order/mod.rs b/datafusion/physical-plan/src/aggregates/order/mod.rs index f72d2f06e459..b258b97a9e84 100644 --- a/datafusion/physical-plan/src/aggregates/order/mod.rs +++ b/datafusion/physical-plan/src/aggregates/order/mod.rs @@ -23,7 +23,7 @@ use datafusion_physical_expr::{EmitTo, PhysicalSortExpr}; mod full; mod partial; -use crate::windows::PartitionSearchMode; +use crate::InputOrderMode; pub(crate) use full::GroupOrderingFull; pub(crate) use partial::GroupOrderingPartial; @@ -42,18 +42,16 @@ impl GroupOrdering { /// Create a `GroupOrdering` for the the specified ordering pub fn try_new( input_schema: &Schema, - mode: &PartitionSearchMode, + mode: &InputOrderMode, ordering: &[PhysicalSortExpr], ) -> Result { match mode { - PartitionSearchMode::Linear => Ok(GroupOrdering::None), - PartitionSearchMode::PartiallySorted(order_indices) => { + InputOrderMode::Linear => Ok(GroupOrdering::None), + InputOrderMode::PartiallySorted(order_indices) => { GroupOrderingPartial::try_new(input_schema, order_indices, ordering) .map(GroupOrdering::Partial) } - PartitionSearchMode::Sorted => { - Ok(GroupOrdering::Full(GroupOrderingFull::new())) - } + InputOrderMode::Sorted => Ok(GroupOrdering::Full(GroupOrderingFull::new())), } } diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 2f94c3630c33..89614fd3020c 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -346,7 +346,7 @@ impl GroupedHashAggregateStream { .find_longest_permutation(&agg_group_by.output_exprs()); let group_ordering = GroupOrdering::try_new( &group_schema, - &agg.partition_search_mode, + &agg.input_order_mode, ordering.as_slice(), )?; diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index b2c69b467e9c..f40911c10168 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -58,6 +58,7 @@ pub mod joins; pub mod limit; pub mod memory; pub mod metrics; +mod ordering; pub mod projection; pub mod repartition; pub mod sorts; @@ -72,6 +73,7 @@ pub mod windows; pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; pub use crate::metrics::Metric; +pub use crate::ordering::InputOrderMode; pub use crate::topk::TopK; pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor}; diff --git a/datafusion/physical-plan/src/ordering.rs b/datafusion/physical-plan/src/ordering.rs new file mode 100644 index 000000000000..047f89eef193 --- /dev/null +++ b/datafusion/physical-plan/src/ordering.rs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Specifies how the input to an aggregation or window operator is ordered +/// relative to their `GROUP BY` or `PARTITION BY` expressions. +/// +/// For example, if the existing ordering is `[a ASC, b ASC, c ASC]` +/// +/// ## Window Functions +/// - A `PARTITION BY b` clause can use `Linear` mode. +/// - A `PARTITION BY a, c` or a `PARTITION BY c, a` can use +/// `PartiallySorted([0])` or `PartiallySorted([1])` modes, respectively. +/// (The vector stores the index of `a` in the respective PARTITION BY expression.) +/// - A `PARTITION BY a, b` or a `PARTITION BY b, a` can use `Sorted` mode. +/// +/// ## Aggregations +/// - A `GROUP BY b` clause can use `Linear` mode. +/// - A `GROUP BY a, c` or a `GROUP BY BY c, a` can use +/// `PartiallySorted([0])` or `PartiallySorted([1])` modes, respectively. +/// (The vector stores the index of `a` in the respective PARTITION BY expression.) +/// - A `GROUP BY a, b` or a `GROUP BY b, a` can use `Sorted` mode. +/// +/// Note these are the same examples as above, but with `GROUP BY` instead of +/// `PARTITION BY` to make the examples easier to read. +#[derive(Debug, Clone, PartialEq)] +pub enum InputOrderMode { + /// There is no partial permutation of the expressions satisfying the + /// existing ordering. + Linear, + /// There is a partial permutation of the expressions satisfying the + /// existing ordering. Indices describing the longest partial permutation + /// are stored in the vector. + PartiallySorted(Vec), + /// There is a (full) permutation of the expressions satisfying the + /// existing ordering. + Sorted, +} diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 8156ab1fa31b..9e4d6c137067 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -31,11 +31,12 @@ use crate::expressions::PhysicalSortExpr; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::windows::{ calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs, - window_equivalence_properties, PartitionSearchMode, + window_equivalence_properties, }; use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, - Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr, + InputOrderMode, Partitioning, RecordBatchStream, SendableRecordBatchStream, + Statistics, WindowExpr, }; use arrow::{ @@ -81,8 +82,8 @@ pub struct BoundedWindowAggExec { pub partition_keys: Vec>, /// Execution metrics metrics: ExecutionPlanMetricsSet, - /// Partition by search mode - pub partition_search_mode: PartitionSearchMode, + /// Describes how the input is ordered relative to the partition keys + pub input_order_mode: InputOrderMode, /// Partition by indices that define ordering // For example, if input ordering is ORDER BY a, b and window expression // contains PARTITION BY b, a; `ordered_partition_by_indices` would be 1, 0. @@ -98,13 +99,13 @@ impl BoundedWindowAggExec { window_expr: Vec>, input: Arc, partition_keys: Vec>, - partition_search_mode: PartitionSearchMode, + input_order_mode: InputOrderMode, ) -> Result { let schema = create_schema(&input.schema(), &window_expr)?; let schema = Arc::new(schema); let partition_by_exprs = window_expr[0].partition_by(); - let ordered_partition_by_indices = match &partition_search_mode { - PartitionSearchMode::Sorted => { + let ordered_partition_by_indices = match &input_order_mode { + InputOrderMode::Sorted => { let indices = get_ordered_partition_by_indices( window_expr[0].partition_by(), &input, @@ -115,10 +116,8 @@ impl BoundedWindowAggExec { (0..partition_by_exprs.len()).collect::>() } } - PartitionSearchMode::PartiallySorted(ordered_indices) => { - ordered_indices.clone() - } - PartitionSearchMode::Linear => { + InputOrderMode::PartiallySorted(ordered_indices) => ordered_indices.clone(), + InputOrderMode::Linear => { vec![] } }; @@ -128,7 +127,7 @@ impl BoundedWindowAggExec { schema, partition_keys, metrics: ExecutionPlanMetricsSet::new(), - partition_search_mode, + input_order_mode, ordered_partition_by_indices, }) } @@ -162,8 +161,8 @@ impl BoundedWindowAggExec { fn get_search_algo(&self) -> Result> { let partition_by_sort_keys = self.partition_by_sort_keys()?; let ordered_partition_by_indices = self.ordered_partition_by_indices.clone(); - Ok(match &self.partition_search_mode { - PartitionSearchMode::Sorted => { + Ok(match &self.input_order_mode { + InputOrderMode::Sorted => { // In Sorted mode, all partition by columns should be ordered. if self.window_expr()[0].partition_by().len() != ordered_partition_by_indices.len() @@ -175,7 +174,7 @@ impl BoundedWindowAggExec { ordered_partition_by_indices, }) } - PartitionSearchMode::Linear | PartitionSearchMode::PartiallySorted(_) => { + InputOrderMode::Linear | InputOrderMode::PartiallySorted(_) => { Box::new(LinearSearch::new(ordered_partition_by_indices)) } }) @@ -203,7 +202,7 @@ impl DisplayAs for BoundedWindowAggExec { ) }) .collect(); - let mode = &self.partition_search_mode; + let mode = &self.input_order_mode; write!(f, "wdw=[{}], mode=[{:?}]", g.join(", "), mode)?; } } @@ -244,7 +243,7 @@ impl ExecutionPlan for BoundedWindowAggExec { fn required_input_ordering(&self) -> Vec>> { let partition_bys = self.window_expr()[0].partition_by(); let order_keys = self.window_expr()[0].order_by(); - if self.partition_search_mode != PartitionSearchMode::Sorted + if self.input_order_mode != InputOrderMode::Sorted || self.ordered_partition_by_indices.len() >= partition_bys.len() { let partition_bys = self @@ -283,7 +282,7 @@ impl ExecutionPlan for BoundedWindowAggExec { self.window_expr.clone(), children[0].clone(), self.partition_keys.clone(), - self.partition_search_mode.clone(), + self.input_order_mode.clone(), )?)) } @@ -1114,7 +1113,7 @@ fn get_aggregate_result_out_column( mod tests { use crate::common::collect; use crate::memory::MemoryExec; - use crate::windows::{BoundedWindowAggExec, PartitionSearchMode}; + use crate::windows::{BoundedWindowAggExec, InputOrderMode}; use crate::{get_plan_string, ExecutionPlan}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema}; @@ -1201,7 +1200,7 @@ mod tests { window_exprs, memory_exec, vec![], - PartitionSearchMode::Sorted, + InputOrderMode::Sorted, ) .map(|e| Arc::new(e) as Arc)?; diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 828dcb4b130c..3187e6b0fbd3 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -27,7 +27,7 @@ use crate::{ cume_dist, dense_rank, lag, lead, percent_rank, rank, Literal, NthValue, Ntile, PhysicalSortExpr, RowNumber, }, - udaf, unbounded_output, ExecutionPlan, PhysicalExpr, + udaf, unbounded_output, ExecutionPlan, InputOrderMode, PhysicalExpr, }; use arrow::datatypes::Schema; @@ -54,30 +54,6 @@ pub use datafusion_physical_expr::window::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr, }; -#[derive(Debug, Clone, PartialEq)] -/// Specifies aggregation grouping and/or window partitioning properties of a -/// set of expressions in terms of the existing ordering. -/// For example, if the existing ordering is `[a ASC, b ASC, c ASC]`: -/// - A `PARTITION BY b` clause will result in `Linear` mode. -/// - A `PARTITION BY a, c` or a `PARTITION BY c, a` clause will result in -/// `PartiallySorted([0])` or `PartiallySorted([1])` modes, respectively. -/// The vector stores the index of `a` in the respective PARTITION BY expression. -/// - A `PARTITION BY a, b` or a `PARTITION BY b, a` clause will result in -/// `Sorted` mode. -/// Note that the examples above are applicable for `GROUP BY` clauses too. -pub enum PartitionSearchMode { - /// There is no partial permutation of the expressions satisfying the - /// existing ordering. - Linear, - /// There is a partial permutation of the expressions satisfying the - /// existing ordering. Indices describing the longest partial permutation - /// are stored in the vector. - PartiallySorted(Vec), - /// There is a (full) permutation of the expressions satisfying the - /// existing ordering. - Sorted, -} - /// Create a physical expression for window function pub fn create_window_expr( fun: &WindowFunction, @@ -414,17 +390,17 @@ pub fn get_best_fitting_window( // of the window_exprs are same. let partitionby_exprs = window_exprs[0].partition_by(); let orderby_keys = window_exprs[0].order_by(); - let (should_reverse, partition_search_mode) = - if let Some((should_reverse, partition_search_mode)) = + let (should_reverse, input_order_mode) = + if let Some((should_reverse, input_order_mode)) = get_window_mode(partitionby_exprs, orderby_keys, input) { - (should_reverse, partition_search_mode) + (should_reverse, input_order_mode) } else { return Ok(None); }; let is_unbounded = unbounded_output(input); - if !is_unbounded && partition_search_mode != PartitionSearchMode::Sorted { - // Executor has bounded input and `partition_search_mode` is not `PartitionSearchMode::Sorted` + if !is_unbounded && input_order_mode != InputOrderMode::Sorted { + // Executor has bounded input and `input_order_mode` is not `InputOrderMode::Sorted` // in this case removing the sort is not helpful, return: return Ok(None); }; @@ -452,13 +428,13 @@ pub fn get_best_fitting_window( window_expr, input.clone(), physical_partition_keys.to_vec(), - partition_search_mode, + input_order_mode, )?) as _)) - } else if partition_search_mode != PartitionSearchMode::Sorted { + } else if input_order_mode != InputOrderMode::Sorted { // For `WindowAggExec` to work correctly PARTITION BY columns should be sorted. - // Hence, if `partition_search_mode` is not `PartitionSearchMode::Sorted` we should convert - // input ordering such that it can work with PartitionSearchMode::Sorted (add `SortExec`). - // Effectively `WindowAggExec` works only in PartitionSearchMode::Sorted mode. + // Hence, if `input_order_mode` is not `Sorted` we should convert + // input ordering such that it can work with `Sorted` (add `SortExec`). + // Effectively `WindowAggExec` works only in `Sorted` mode. Ok(None) } else { Ok(Some(Arc::new(WindowAggExec::try_new( @@ -474,16 +450,16 @@ pub fn get_best_fitting_window( /// is sufficient to run the current window operator. /// - A `None` return value indicates that we can not remove the sort in question /// (input ordering is not sufficient to run current window executor). -/// - A `Some((bool, PartitionSearchMode))` value indicates that the window operator +/// - A `Some((bool, InputOrderMode))` value indicates that the window operator /// can run with existing input ordering, so we can remove `SortExec` before it. /// The `bool` field in the return value represents whether we should reverse window -/// operator to remove `SortExec` before it. The `PartitionSearchMode` field represents +/// operator to remove `SortExec` before it. The `InputOrderMode` field represents /// the mode this window operator should work in to accommodate the existing ordering. pub fn get_window_mode( partitionby_exprs: &[Arc], orderby_keys: &[PhysicalSortExpr], input: &Arc, -) -> Option<(bool, PartitionSearchMode)> { +) -> Option<(bool, InputOrderMode)> { let input_eqs = input.equivalence_properties(); let mut partition_by_reqs: Vec = vec![]; let (_, indices) = input_eqs.find_longest_permutation(partitionby_exprs); @@ -504,11 +480,11 @@ pub fn get_window_mode( if partition_by_eqs.ordering_satisfy_requirement(&req) { // Window can be run with existing ordering let mode = if indices.len() == partitionby_exprs.len() { - PartitionSearchMode::Sorted + InputOrderMode::Sorted } else if indices.is_empty() { - PartitionSearchMode::Linear + InputOrderMode::Linear } else { - PartitionSearchMode::PartiallySorted(indices) + InputOrderMode::PartiallySorted(indices) }; return Some((should_swap, mode)); } @@ -532,7 +508,7 @@ mod tests { use futures::FutureExt; - use PartitionSearchMode::{Linear, PartiallySorted, Sorted}; + use InputOrderMode::{Linear, PartiallySorted, Sorted}; fn create_test_schema() -> Result { let nullable_column = Field::new("nullable_col", DataType::Int32, true); @@ -792,11 +768,11 @@ mod tests { // Second field in the tuple is Vec where each element in the vector represents ORDER BY columns // For instance, vec!["c"], corresponds to ORDER BY c ASC NULLS FIRST, (ordering is default ordering. We do not check // for reversibility in this test). - // Third field in the tuple is Option, which corresponds to expected algorithm mode. + // Third field in the tuple is Option, which corresponds to expected algorithm mode. // None represents that existing ordering is not sufficient to run executor with any one of the algorithms // (We need to add SortExec to be able to run it). - // Some(PartitionSearchMode) represents, we can run algorithm with existing ordering; and algorithm should work in - // PartitionSearchMode. + // Some(InputOrderMode) represents, we can run algorithm with existing ordering; and algorithm should work in + // InputOrderMode. let test_cases = vec![ (vec!["a"], vec!["a"], Some(Sorted)), (vec!["a"], vec!["b"], Some(Sorted)), @@ -881,7 +857,7 @@ mod tests { } let res = get_window_mode(&partition_by_exprs, &order_by_exprs, &exec_unbounded); - // Since reversibility is not important in this test. Convert Option<(bool, PartitionSearchMode)> to Option + // Since reversibility is not important in this test. Convert Option<(bool, InputOrderMode)> to Option let res = res.map(|(_, mode)| mode); assert_eq!( res, *expected, @@ -912,12 +888,12 @@ mod tests { // Second field in the tuple is Vec<(str, bool, bool)> where each element in the vector represents ORDER BY columns // For instance, vec![("c", false, false)], corresponds to ORDER BY c ASC NULLS LAST, // similarly, vec![("c", true, true)], corresponds to ORDER BY c DESC NULLS FIRST, - // Third field in the tuple is Option<(bool, PartitionSearchMode)>, which corresponds to expected result. + // Third field in the tuple is Option<(bool, InputOrderMode)>, which corresponds to expected result. // None represents that existing ordering is not sufficient to run executor with any one of the algorithms // (We need to add SortExec to be able to run it). - // Some((bool, PartitionSearchMode)) represents, we can run algorithm with existing ordering. Algorithm should work in - // PartitionSearchMode, bool field represents whether we should reverse window expressions to run executor with existing ordering. - // For instance, `Some((false, PartitionSearchMode::Sorted))`, represents that we shouldn't reverse window expressions. And algorithm + // Some((bool, InputOrderMode)) represents, we can run algorithm with existing ordering. Algorithm should work in + // InputOrderMode, bool field represents whether we should reverse window expressions to run executor with existing ordering. + // For instance, `Some((false, InputOrderMode::Sorted))`, represents that we shouldn't reverse window expressions. And algorithm // should work in Sorted mode to work with existing ordering. let test_cases = vec![ // PARTITION BY a, b ORDER BY c ASC NULLS LAST diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 8c2fd5369e33..daf539f219de 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1502,7 +1502,7 @@ enum AggregateMode { SINGLE_PARTITIONED = 4; } -message PartiallySortedPartitionSearchMode { +message PartiallySortedInputOrderMode { repeated uint64 columns = 6; } @@ -1511,9 +1511,9 @@ message WindowAggExecNode { repeated PhysicalWindowExprNode window_expr = 2; repeated PhysicalExprNode partition_keys = 5; // Set optional to `None` for `BoundedWindowAggExec`. - oneof partition_search_mode { + oneof input_order_mode { EmptyMessage linear = 7; - PartiallySortedPartitionSearchMode partially_sorted = 8; + PartiallySortedInputOrderMode partially_sorted = 8; EmptyMessage sorted = 9; } } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index b8c5f6a4aae8..f453875d71d4 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -14967,7 +14967,7 @@ impl<'de> serde::Deserialize<'de> for PartialTableReference { deserializer.deserialize_struct("datafusion.PartialTableReference", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for PartiallySortedPartitionSearchMode { +impl serde::Serialize for PartiallySortedInputOrderMode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result where @@ -14978,14 +14978,14 @@ impl serde::Serialize for PartiallySortedPartitionSearchMode { if !self.columns.is_empty() { len += 1; } - let mut struct_ser = serializer.serialize_struct("datafusion.PartiallySortedPartitionSearchMode", len)?; + let mut struct_ser = serializer.serialize_struct("datafusion.PartiallySortedInputOrderMode", len)?; if !self.columns.is_empty() { struct_ser.serialize_field("columns", &self.columns.iter().map(ToString::to_string).collect::>())?; } struct_ser.end() } } -impl<'de> serde::Deserialize<'de> for PartiallySortedPartitionSearchMode { +impl<'de> serde::Deserialize<'de> for PartiallySortedInputOrderMode { #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where @@ -15029,13 +15029,13 @@ impl<'de> serde::Deserialize<'de> for PartiallySortedPartitionSearchMode { } struct GeneratedVisitor; impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = PartiallySortedPartitionSearchMode; + type Value = PartiallySortedInputOrderMode; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct datafusion.PartiallySortedPartitionSearchMode") + formatter.write_str("struct datafusion.PartiallySortedInputOrderMode") } - fn visit_map(self, mut map_: V) -> std::result::Result + fn visit_map(self, mut map_: V) -> std::result::Result where V: serde::de::MapAccess<'de>, { @@ -15053,12 +15053,12 @@ impl<'de> serde::Deserialize<'de> for PartiallySortedPartitionSearchMode { } } } - Ok(PartiallySortedPartitionSearchMode { + Ok(PartiallySortedInputOrderMode { columns: columns__.unwrap_or_default(), }) } } - deserializer.deserialize_struct("datafusion.PartiallySortedPartitionSearchMode", FIELDS, GeneratedVisitor) + deserializer.deserialize_struct("datafusion.PartiallySortedInputOrderMode", FIELDS, GeneratedVisitor) } } impl serde::Serialize for PartitionColumn { @@ -25639,7 +25639,7 @@ impl serde::Serialize for WindowAggExecNode { if !self.partition_keys.is_empty() { len += 1; } - if self.partition_search_mode.is_some() { + if self.input_order_mode.is_some() { len += 1; } let mut struct_ser = serializer.serialize_struct("datafusion.WindowAggExecNode", len)?; @@ -25652,15 +25652,15 @@ impl serde::Serialize for WindowAggExecNode { if !self.partition_keys.is_empty() { struct_ser.serialize_field("partitionKeys", &self.partition_keys)?; } - if let Some(v) = self.partition_search_mode.as_ref() { + if let Some(v) = self.input_order_mode.as_ref() { match v { - window_agg_exec_node::PartitionSearchMode::Linear(v) => { + window_agg_exec_node::InputOrderMode::Linear(v) => { struct_ser.serialize_field("linear", v)?; } - window_agg_exec_node::PartitionSearchMode::PartiallySorted(v) => { + window_agg_exec_node::InputOrderMode::PartiallySorted(v) => { struct_ser.serialize_field("partiallySorted", v)?; } - window_agg_exec_node::PartitionSearchMode::Sorted(v) => { + window_agg_exec_node::InputOrderMode::Sorted(v) => { struct_ser.serialize_field("sorted", v)?; } } @@ -25743,7 +25743,7 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode { let mut input__ = None; let mut window_expr__ = None; let mut partition_keys__ = None; - let mut partition_search_mode__ = None; + let mut input_order_mode__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Input => { @@ -25765,24 +25765,24 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode { partition_keys__ = Some(map_.next_value()?); } GeneratedField::Linear => { - if partition_search_mode__.is_some() { + if input_order_mode__.is_some() { return Err(serde::de::Error::duplicate_field("linear")); } - partition_search_mode__ = map_.next_value::<::std::option::Option<_>>()?.map(window_agg_exec_node::PartitionSearchMode::Linear) + input_order_mode__ = map_.next_value::<::std::option::Option<_>>()?.map(window_agg_exec_node::InputOrderMode::Linear) ; } GeneratedField::PartiallySorted => { - if partition_search_mode__.is_some() { + if input_order_mode__.is_some() { return Err(serde::de::Error::duplicate_field("partiallySorted")); } - partition_search_mode__ = map_.next_value::<::std::option::Option<_>>()?.map(window_agg_exec_node::PartitionSearchMode::PartiallySorted) + input_order_mode__ = map_.next_value::<::std::option::Option<_>>()?.map(window_agg_exec_node::InputOrderMode::PartiallySorted) ; } GeneratedField::Sorted => { - if partition_search_mode__.is_some() { + if input_order_mode__.is_some() { return Err(serde::de::Error::duplicate_field("sorted")); } - partition_search_mode__ = map_.next_value::<::std::option::Option<_>>()?.map(window_agg_exec_node::PartitionSearchMode::Sorted) + input_order_mode__ = map_.next_value::<::std::option::Option<_>>()?.map(window_agg_exec_node::InputOrderMode::Sorted) ; } } @@ -25791,7 +25791,7 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode { input: input__, window_expr: window_expr__.unwrap_or_default(), partition_keys: partition_keys__.unwrap_or_default(), - partition_search_mode: partition_search_mode__, + input_order_mode: input_order_mode__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index c31bc4ab5948..9e78b7c8d6dd 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2101,7 +2101,7 @@ pub struct ProjectionExecNode { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct PartiallySortedPartitionSearchMode { +pub struct PartiallySortedInputOrderMode { #[prost(uint64, repeated, tag = "6")] pub columns: ::prost::alloc::vec::Vec, } @@ -2115,21 +2115,19 @@ pub struct WindowAggExecNode { #[prost(message, repeated, tag = "5")] pub partition_keys: ::prost::alloc::vec::Vec, /// Set optional to `None` for `BoundedWindowAggExec`. - #[prost(oneof = "window_agg_exec_node::PartitionSearchMode", tags = "7, 8, 9")] - pub partition_search_mode: ::core::option::Option< - window_agg_exec_node::PartitionSearchMode, - >, + #[prost(oneof = "window_agg_exec_node::InputOrderMode", tags = "7, 8, 9")] + pub input_order_mode: ::core::option::Option, } /// Nested message and enum types in `WindowAggExecNode`. pub mod window_agg_exec_node { /// Set optional to `None` for `BoundedWindowAggExec`. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum PartitionSearchMode { + pub enum InputOrderMode { #[prost(message, tag = "7")] Linear(super::EmptyMessage), #[prost(message, tag = "8")] - PartiallySorted(super::PartiallySortedPartitionSearchMode), + PartiallySorted(super::PartiallySortedInputOrderMode), #[prost(message, tag = "9")] Sorted(super::EmptyMessage), } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 6714c35dc615..907ba04ebc20 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -49,11 +49,10 @@ use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion::physical_plan::union::UnionExec; -use datafusion::physical_plan::windows::{ - BoundedWindowAggExec, PartitionSearchMode, WindowAggExec, -}; +use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use datafusion::physical_plan::{ - udaf, AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr, + udaf, AggregateExpr, ExecutionPlan, InputOrderMode, Partitioning, PhysicalExpr, + WindowExpr, }; use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; use prost::bytes::BufMut; @@ -313,20 +312,18 @@ impl AsExecutionPlan for PhysicalPlanNode { }) .collect::>>>()?; - if let Some(partition_search_mode) = - window_agg.partition_search_mode.as_ref() - { - let partition_search_mode = match partition_search_mode { - window_agg_exec_node::PartitionSearchMode::Linear(_) => { - PartitionSearchMode::Linear + if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() { + let input_order_mode = match input_order_mode { + window_agg_exec_node::InputOrderMode::Linear(_) => { + InputOrderMode::Linear } - window_agg_exec_node::PartitionSearchMode::PartiallySorted( - protobuf::PartiallySortedPartitionSearchMode { columns }, - ) => PartitionSearchMode::PartiallySorted( + window_agg_exec_node::InputOrderMode::PartiallySorted( + protobuf::PartiallySortedInputOrderMode { columns }, + ) => InputOrderMode::PartiallySorted( columns.iter().map(|c| *c as usize).collect(), ), - window_agg_exec_node::PartitionSearchMode::Sorted(_) => { - PartitionSearchMode::Sorted + window_agg_exec_node::InputOrderMode::Sorted(_) => { + InputOrderMode::Sorted } }; @@ -334,7 +331,7 @@ impl AsExecutionPlan for PhysicalPlanNode { physical_window_expr, input, partition_keys, - partition_search_mode, + input_order_mode, )?)) } else { Ok(Arc::new(WindowAggExec::try_new( @@ -1560,7 +1557,7 @@ impl AsExecutionPlan for PhysicalPlanNode { input: Some(Box::new(input)), window_expr, partition_keys, - partition_search_mode: None, + input_order_mode: None, }, ))), }); @@ -1584,24 +1581,20 @@ impl AsExecutionPlan for PhysicalPlanNode { .map(|e| e.clone().try_into()) .collect::>>()?; - let partition_search_mode = match &exec.partition_search_mode { - PartitionSearchMode::Linear => { - window_agg_exec_node::PartitionSearchMode::Linear( - protobuf::EmptyMessage {}, - ) - } - PartitionSearchMode::PartiallySorted(columns) => { - window_agg_exec_node::PartitionSearchMode::PartiallySorted( - protobuf::PartiallySortedPartitionSearchMode { + let input_order_mode = match &exec.input_order_mode { + InputOrderMode::Linear => window_agg_exec_node::InputOrderMode::Linear( + protobuf::EmptyMessage {}, + ), + InputOrderMode::PartiallySorted(columns) => { + window_agg_exec_node::InputOrderMode::PartiallySorted( + protobuf::PartiallySortedInputOrderMode { columns: columns.iter().map(|c| *c as u64).collect(), }, ) } - PartitionSearchMode::Sorted => { - window_agg_exec_node::PartitionSearchMode::Sorted( - protobuf::EmptyMessage {}, - ) - } + InputOrderMode::Sorted => window_agg_exec_node::InputOrderMode::Sorted( + protobuf::EmptyMessage {}, + ), }; return Ok(protobuf::PhysicalPlanNode { @@ -1610,7 +1603,7 @@ impl AsExecutionPlan for PhysicalPlanNode { input: Some(Box::new(input)), window_expr, partition_keys, - partition_search_mode: Some(partition_search_mode), + input_order_mode: Some(input_order_mode), }, ))), });